Skip to content

Commit

Permalink
Replace usages of internal_err with exec_err where appropriate (#9241)
Browse files Browse the repository at this point in the history
* internal_err! -> exec_err!

* fmt updates.

* Updated error macro from exec_err! to not_impl_err! for all unsupported type errors.

* Revert "Updated error macro from exec_err! to not_impl_err! for all unsupported type errors."

This reverts commit fe0517a.

* Updated a few instances of internal_err missed in previous audit.

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
Omega359 and alamb authored Feb 27, 2024
1 parent acd09da commit c439bc7
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 180 deletions.
13 changes: 6 additions & 7 deletions datafusion/functions/src/core/nullif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
//! Encoding expressions
use arrow::datatypes::DataType;
use datafusion_common::{internal_err, Result, DataFusionError};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;

use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use arrow::array::Array;
use arrow::compute::kernels::cmp::eq;
use arrow::compute::kernels::nullif::nullif;
use datafusion_common::ScalarValue;
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use std::any::Any;

#[derive(Debug)]
pub(super) struct NullIfFunc {
Expand Down Expand Up @@ -58,7 +58,7 @@ impl NullIfFunc {
Self {
signature:
Signature::uniform(2, SUPPORTED_NULLIF_TYPES.to_vec(),
Volatility::Immutable,
Volatility::Immutable,
)
}
}
Expand All @@ -81,7 +81,7 @@ impl ScalarUDFImpl for NullIfFunc {
let coerced_types = datafusion_expr::type_coercion::functions::data_types(arg_types, &self.signature);
coerced_types.map(|typs| typs[0].clone())
.map_err(|e| e.context("Failed to coerce arguments for NULLIF")
)
)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
Expand All @@ -90,14 +90,13 @@ impl ScalarUDFImpl for NullIfFunc {
}



/// Implements NULLIF(expr1, expr2)
/// Args: 0 - left expr is any array
/// 1 - if the left is equal to this expr2, then the result is NULL, otherwise left value is passed.
///
fn nullif_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but NULLIF takes exactly two args",
args.len()
);
Expand Down
34 changes: 18 additions & 16 deletions datafusion/functions/src/encoding/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use arrow::{
datatypes::DataType,
};
use base64::{engine::general_purpose, Engine as _};
use datafusion_common::ScalarValue;
use datafusion_common::{
cast::{as_generic_binary_array, as_generic_string_array},
internal_err, not_impl_err, plan_err,
not_impl_err, plan_err,
};
use datafusion_common::{exec_err, ScalarValue};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use std::sync::Arc;
Expand Down Expand Up @@ -111,6 +111,7 @@ impl DecodeFunc {
}
}
}

impl ScalarUDFImpl for DecodeFunc {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -148,14 +149,15 @@ enum Encoding {
Base64,
Hex,
}

fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result<ColumnarValue> {
match value {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8 => encoding.encode_utf8_array::<i32>(a.as_ref()),
DataType::LargeUtf8 => encoding.encode_utf8_array::<i64>(a.as_ref()),
DataType::Binary => encoding.encode_binary_array::<i32>(a.as_ref()),
DataType::LargeBinary => encoding.encode_binary_array::<i64>(a.as_ref()),
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function encode({encoding})"
),
},
Expand All @@ -171,7 +173,7 @@ fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result<ColumnarV
),
ScalarValue::LargeBinary(a) => Ok(encoding
.encode_large_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function encode({encoding})"
),
}
Expand All @@ -186,7 +188,7 @@ fn decode_process(value: &ColumnarValue, encoding: Encoding) -> Result<ColumnarV
DataType::LargeUtf8 => encoding.decode_utf8_array::<i64>(a.as_ref()),
DataType::Binary => encoding.decode_binary_array::<i32>(a.as_ref()),
DataType::LargeBinary => encoding.decode_binary_array::<i64>(a.as_ref()),
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function decode({encoding})"
),
},
Expand All @@ -202,7 +204,7 @@ fn decode_process(value: &ColumnarValue, encoding: Encoding) -> Result<ColumnarV
}
ScalarValue::LargeBinary(a) => encoding
.decode_large_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice())),
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function decode({encoding})"
),
}
Expand Down Expand Up @@ -270,8 +272,8 @@ impl Encoding {
}

fn encode_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: OffsetSizeTrait,
where
T: OffsetSizeTrait,
{
let input_value = as_generic_binary_array::<T>(value)?;
let array: ArrayRef = match self {
Expand All @@ -282,8 +284,8 @@ impl Encoding {
}

fn encode_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: OffsetSizeTrait,
where
T: OffsetSizeTrait,
{
let input_value = as_generic_string_array::<T>(value)?;
let array: ArrayRef = match self {
Expand Down Expand Up @@ -350,8 +352,8 @@ impl Encoding {
}

fn decode_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: OffsetSizeTrait,
where
T: OffsetSizeTrait,
{
let input_value = as_generic_binary_array::<T>(value)?;
let array: ArrayRef = match self {
Expand All @@ -362,8 +364,8 @@ impl Encoding {
}

fn decode_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: OffsetSizeTrait,
where
T: OffsetSizeTrait,
{
let input_value = as_generic_string_array::<T>(value)?;
let array: ArrayRef = match self {
Expand Down Expand Up @@ -405,7 +407,7 @@ impl FromStr for Encoding {
/// Standard encodings are base64 and hex.
fn encode(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but encode takes exactly two arguments",
args.len()
);
Expand All @@ -431,7 +433,7 @@ fn encode(args: &[ColumnarValue]) -> Result<ColumnarValue> {
/// Standard encodings are base64 and hex.
fn decode(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but decode takes exactly two arguments",
args.len()
);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/functions/src/math/abs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use arrow::array::Int32Array;
use arrow::array::Int64Array;
use arrow::array::Int8Array;
use arrow::datatypes::DataType;
use datafusion_common::not_impl_err;
use datafusion_common::{exec_err, not_impl_err};
use datafusion_common::plan_datafusion_err;
use datafusion_common::{internal_err, Result, DataFusionError};
use datafusion_common::{Result, DataFusionError};
use datafusion_expr::utils;
use datafusion_expr::ColumnarValue;

Expand Down Expand Up @@ -165,7 +165,7 @@ impl ScalarUDFImpl for AbsFunc {
let args = ColumnarValue::values_to_arrays(args)?;

if args.len() != 1 {
return internal_err!("abs function requires 1 argument, got {}", args.len());
return exec_err!("abs function requires 1 argument, got {}", args.len());
}

let input_data_type = args[0].data_type();
Expand Down
10 changes: 5 additions & 5 deletions datafusion/functions/src/math/nans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
//! Encoding expressions
use arrow::datatypes::DataType;
use datafusion_common::{internal_err, Result, DataFusionError};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;

use arrow::array::{ArrayRef, BooleanArray, Float32Array, Float64Array};
use datafusion_expr::TypeSignature::*;
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use std::sync::Arc;
use arrow::array::{ArrayRef, BooleanArray, Float32Array, Float64Array};

#[derive(Debug)]
pub(super) struct IsNanFunc {
Expand Down Expand Up @@ -73,7 +73,7 @@ impl ScalarUDFImpl for IsNanFunc {
BooleanArray,
{ f64::is_nan }
))
},
}
DataType::Float32 => {
Arc::new(make_function_scalar_inputs_return_type!(
&args[0],
Expand All @@ -82,8 +82,8 @@ impl ScalarUDFImpl for IsNanFunc {
BooleanArray,
{ f32::is_nan }
))
},
other => return internal_err!("Unsupported data type {other:?} for function isnan"),
}
other => return exec_err!("Unsupported data type {other:?} for function isnan"),
};
Ok(ColumnarValue::Array(arr))
}
Expand Down
25 changes: 13 additions & 12 deletions datafusion/physical-expr/src/aggregate/build_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
use std::sync::Arc;

use arrow::datatypes::Schema;

use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::AggregateFunction;

use crate::aggregate::regr::RegrType;
use crate::expressions::{self, Literal};
use crate::{AggregateExpr, PhysicalExpr, PhysicalSortExpr};

use arrow::datatypes::Schema;
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::AggregateFunction;

/// Create a physical aggregation expression.
/// This function errors when `input_phy_exprs`' can't be coerced to a valid argument type of the aggregation function.
pub fn create_aggregate_expr(
Expand Down Expand Up @@ -379,9 +380,7 @@ pub fn create_aggregate_expr(
.downcast_ref::<Literal>()
.map(|literal| literal.value())
else {
return internal_err!(
"Second argument of NTH_VALUE needs to be a literal"
);
return exec_err!("Second argument of NTH_VALUE needs to be a literal");
};
let nullable = expr.nullable(input_schema)?;
Arc::new(expressions::NthValueAgg::new(
Expand Down Expand Up @@ -415,17 +414,19 @@ pub fn create_aggregate_expr(

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field};

use datafusion_common::{plan_err, ScalarValue};
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::{type_coercion, Signature};

use crate::expressions::{
try_cast, ApproxDistinct, ApproxMedian, ApproxPercentileCont, ArrayAgg, Avg,
BitAnd, BitOr, BitXor, BoolAnd, BoolOr, Correlation, Count, Covariance,
DistinctArrayAgg, DistinctCount, Max, Min, Stddev, Sum, Variance,
};

use arrow::datatypes::{DataType, Field};
use datafusion_common::{plan_err, ScalarValue};
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::{type_coercion, Signature};
use super::*;

#[test]
fn test_count_arragg_approx_expr() -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/conditional_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use arrow::array::{new_null_array, Array, BooleanArray};
use arrow::compute::kernels::zip::zip;
use arrow::compute::{and, is_not_null, is_null};

use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;

/// coalesce evaluates to the first value which is not NULL
pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
// do not accept 0 arguments.
if args.is_empty() {
return internal_err!(
return exec_err!(
"coalesce was called with {} arguments. It requires at least 1.",
args.len()
);
Expand Down
16 changes: 8 additions & 8 deletions datafusion/physical-expr/src/crypto_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use arrow::{
};
use blake2::{Blake2b512, Blake2s256, Digest};
use blake3::Hasher as Blake3;
use datafusion_common::ScalarValue;
use datafusion_common::{
cast::{as_binary_array, as_generic_binary_array, as_generic_string_array},
plan_err,
};
use datafusion_common::{exec_err, ScalarValue};
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use md5::Md5;
Expand Down Expand Up @@ -66,7 +66,7 @@ fn digest_process(
DataType::LargeBinary => {
digest_algorithm.digest_binary_array::<i64>(a.as_ref())
}
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function {digest_algorithm}"
),
},
Expand All @@ -77,7 +77,7 @@ fn digest_process(
}
ScalarValue::Binary(a) | ScalarValue::LargeBinary(a) => Ok(digest_algorithm
.digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function {digest_algorithm}"
),
},
Expand Down Expand Up @@ -238,7 +238,7 @@ macro_rules! define_digest_function {
#[doc = $DOC]
pub fn $NAME(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 1 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but {} takes exactly one argument",
args.len(),
DigestAlgorithm::$METHOD.to_string()
Expand All @@ -264,7 +264,7 @@ fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
/// computes md5 hash digest of the given input
pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 1 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but {} takes exactly one argument",
args.len(),
DigestAlgorithm::Md5
Expand All @@ -284,7 +284,7 @@ pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
ColumnarValue::Scalar(ScalarValue::Utf8(opt.map(hex_encode::<_>)))
}
_ => return internal_err!("Impossibly got invalid results from digest"),
_ => return exec_err!("Impossibly got invalid results from digest"),
})
}

Expand Down Expand Up @@ -329,7 +329,7 @@ define_digest_function!(
/// Standard algorithms are md5, sha1, sha224, sha256, sha384 and sha512.
pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but digest takes exactly two arguments",
args.len()
);
Expand All @@ -339,7 +339,7 @@ pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
ScalarValue::Utf8(Some(method)) | ScalarValue::LargeUtf8(Some(method)) => {
method.parse::<DigestAlgorithm>()
}
other => internal_err!("Unsupported data type {other:?} for function digest"),
other => exec_err!("Unsupported data type {other:?} for function digest"),
},
ColumnarValue::Array(_) => {
internal_err!("Digest using dynamically decided method is not yet supported")
Expand Down
Loading

0 comments on commit c439bc7

Please sign in to comment.