Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace usages of internal_err with exec_err where appropriate #9241

Merged
merged 8 commits into from
Feb 27, 2024
13 changes: 6 additions & 7 deletions datafusion/functions/src/core/nullif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
//! Encoding expressions

use arrow::datatypes::DataType;
use datafusion_common::{internal_err, Result, DataFusionError};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;

use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use arrow::array::Array;
use arrow::compute::kernels::cmp::eq;
use arrow::compute::kernels::nullif::nullif;
use datafusion_common::ScalarValue;
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use std::any::Any;

#[derive(Debug)]
pub(super) struct NullIfFunc {
Expand Down Expand Up @@ -58,7 +58,7 @@ impl NullIfFunc {
Self {
signature:
Signature::uniform(2, SUPPORTED_NULLIF_TYPES.to_vec(),
Volatility::Immutable,
Volatility::Immutable,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a strange change -- I wonder if cargo fmt doesn't see this file for some reason 🤔

Copy link
Contributor

@alamb alamb Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see now #9281. I made a PR #9367 to fix the issue

)
}
}
Expand All @@ -81,7 +81,7 @@ impl ScalarUDFImpl for NullIfFunc {
let coerced_types = datafusion_expr::type_coercion::functions::data_types(arg_types, &self.signature);
coerced_types.map(|typs| typs[0].clone())
.map_err(|e| e.context("Failed to coerce arguments for NULLIF")
)
)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
Expand All @@ -90,14 +90,13 @@ impl ScalarUDFImpl for NullIfFunc {
}



/// Implements NULLIF(expr1, expr2)
/// Args: 0 - left expr is any array
/// 1 - if the left is equal to this expr2, then the result is NULL, otherwise left value is passed.
///
fn nullif_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but NULLIF takes exactly two args",
args.len()
);
Expand Down
34 changes: 18 additions & 16 deletions datafusion/functions/src/encoding/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use arrow::{
datatypes::DataType,
};
use base64::{engine::general_purpose, Engine as _};
use datafusion_common::ScalarValue;
use datafusion_common::{
cast::{as_generic_binary_array, as_generic_string_array},
internal_err, not_impl_err, plan_err,
not_impl_err, plan_err,
};
use datafusion_common::{exec_err, ScalarValue};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use std::sync::Arc;
Expand Down Expand Up @@ -111,6 +111,7 @@ impl DecodeFunc {
}
}
}

impl ScalarUDFImpl for DecodeFunc {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -148,14 +149,15 @@ enum Encoding {
Base64,
Hex,
}

fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result<ColumnarValue> {
match value {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8 => encoding.encode_utf8_array::<i32>(a.as_ref()),
DataType::LargeUtf8 => encoding.encode_utf8_array::<i64>(a.as_ref()),
DataType::Binary => encoding.encode_binary_array::<i32>(a.as_ref()),
DataType::LargeBinary => encoding.encode_binary_array::<i64>(a.as_ref()),
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function encode({encoding})"
),
},
Expand All @@ -171,7 +173,7 @@ fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result<ColumnarV
),
ScalarValue::LargeBinary(a) => Ok(encoding
.encode_large_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function encode({encoding})"
),
}
Expand All @@ -186,7 +188,7 @@ fn decode_process(value: &ColumnarValue, encoding: Encoding) -> Result<ColumnarV
DataType::LargeUtf8 => encoding.decode_utf8_array::<i64>(a.as_ref()),
DataType::Binary => encoding.decode_binary_array::<i32>(a.as_ref()),
DataType::LargeBinary => encoding.decode_binary_array::<i64>(a.as_ref()),
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function decode({encoding})"
),
},
Expand All @@ -202,7 +204,7 @@ fn decode_process(value: &ColumnarValue, encoding: Encoding) -> Result<ColumnarV
}
ScalarValue::LargeBinary(a) => encoding
.decode_large_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice())),
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function decode({encoding})"
),
}
Expand Down Expand Up @@ -270,8 +272,8 @@ impl Encoding {
}

fn encode_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: OffsetSizeTrait,
where
T: OffsetSizeTrait,
{
let input_value = as_generic_binary_array::<T>(value)?;
let array: ArrayRef = match self {
Expand All @@ -282,8 +284,8 @@ impl Encoding {
}

fn encode_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: OffsetSizeTrait,
where
T: OffsetSizeTrait,
{
let input_value = as_generic_string_array::<T>(value)?;
let array: ArrayRef = match self {
Expand Down Expand Up @@ -350,8 +352,8 @@ impl Encoding {
}

fn decode_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: OffsetSizeTrait,
where
T: OffsetSizeTrait,
{
let input_value = as_generic_binary_array::<T>(value)?;
let array: ArrayRef = match self {
Expand All @@ -362,8 +364,8 @@ impl Encoding {
}

fn decode_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: OffsetSizeTrait,
where
T: OffsetSizeTrait,
{
let input_value = as_generic_string_array::<T>(value)?;
let array: ArrayRef = match self {
Expand Down Expand Up @@ -405,7 +407,7 @@ impl FromStr for Encoding {
/// Standard encodings are base64 and hex.
fn encode(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but encode takes exactly two arguments",
args.len()
);
Expand All @@ -431,7 +433,7 @@ fn encode(args: &[ColumnarValue]) -> Result<ColumnarValue> {
/// Standard encodings are base64 and hex.
fn decode(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but decode takes exactly two arguments",
args.len()
);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/functions/src/math/abs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use arrow::array::Int32Array;
use arrow::array::Int64Array;
use arrow::array::Int8Array;
use arrow::datatypes::DataType;
use datafusion_common::not_impl_err;
use datafusion_common::{exec_err, not_impl_err};
use datafusion_common::plan_datafusion_err;
use datafusion_common::{internal_err, Result, DataFusionError};
use datafusion_common::{Result, DataFusionError};
use datafusion_expr::utils;
use datafusion_expr::ColumnarValue;

Expand Down Expand Up @@ -165,7 +165,7 @@ impl ScalarUDFImpl for AbsFunc {
let args = ColumnarValue::values_to_arrays(args)?;

if args.len() != 1 {
return internal_err!("abs function requires 1 argument, got {}", args.len());
return exec_err!("abs function requires 1 argument, got {}", args.len());
}

let input_data_type = args[0].data_type();
Expand Down
10 changes: 5 additions & 5 deletions datafusion/functions/src/math/nans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
//! Encoding expressions

use arrow::datatypes::DataType;
use datafusion_common::{internal_err, Result, DataFusionError};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;

use arrow::array::{ArrayRef, BooleanArray, Float32Array, Float64Array};
use datafusion_expr::TypeSignature::*;
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use std::sync::Arc;
use arrow::array::{ArrayRef, BooleanArray, Float32Array, Float64Array};

#[derive(Debug)]
pub(super) struct IsNanFunc {
Expand Down Expand Up @@ -73,7 +73,7 @@ impl ScalarUDFImpl for IsNanFunc {
BooleanArray,
{ f64::is_nan }
))
},
}
DataType::Float32 => {
Arc::new(make_function_scalar_inputs_return_type!(
&args[0],
Expand All @@ -82,8 +82,8 @@ impl ScalarUDFImpl for IsNanFunc {
BooleanArray,
{ f32::is_nan }
))
},
other => return internal_err!("Unsupported data type {other:?} for function isnan"),
}
other => return exec_err!("Unsupported data type {other:?} for function isnan"),
};
Ok(ColumnarValue::Array(arr))
}
Expand Down
25 changes: 13 additions & 12 deletions datafusion/physical-expr/src/aggregate/build_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@

use std::sync::Arc;

use arrow::datatypes::Schema;

use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::AggregateFunction;

use crate::aggregate::regr::RegrType;
use crate::expressions::{self, Literal};
use crate::{AggregateExpr, PhysicalExpr, PhysicalSortExpr};

use arrow::datatypes::Schema;
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::AggregateFunction;

/// Create a physical aggregation expression.
/// This function errors when `input_phy_exprs`' can't be coerced to a valid argument type of the aggregation function.
pub fn create_aggregate_expr(
Expand Down Expand Up @@ -379,9 +380,7 @@ pub fn create_aggregate_expr(
.downcast_ref::<Literal>()
.map(|literal| literal.value())
else {
return internal_err!(
"Second argument of NTH_VALUE needs to be a literal"
);
return exec_err!("Second argument of NTH_VALUE needs to be a literal");
};
let nullable = expr.nullable(input_schema)?;
Arc::new(expressions::NthValueAgg::new(
Expand Down Expand Up @@ -415,17 +414,19 @@ pub fn create_aggregate_expr(

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field};

use datafusion_common::{plan_err, ScalarValue};
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::{type_coercion, Signature};

use crate::expressions::{
try_cast, ApproxDistinct, ApproxMedian, ApproxPercentileCont, ArrayAgg, Avg,
BitAnd, BitOr, BitXor, BoolAnd, BoolOr, Correlation, Count, Covariance,
DistinctArrayAgg, DistinctCount, Max, Min, Stddev, Sum, Variance,
};

use arrow::datatypes::{DataType, Field};
use datafusion_common::{plan_err, ScalarValue};
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::{type_coercion, Signature};
use super::*;

#[test]
fn test_count_arragg_approx_expr() -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/conditional_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use arrow::array::{new_null_array, Array, BooleanArray};
use arrow::compute::kernels::zip::zip;
use arrow::compute::{and, is_not_null, is_null};

use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;

/// coalesce evaluates to the first value which is not NULL
pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
// do not accept 0 arguments.
if args.is_empty() {
return internal_err!(
return exec_err!(
"coalesce was called with {} arguments. It requires at least 1.",
args.len()
);
Expand Down
16 changes: 8 additions & 8 deletions datafusion/physical-expr/src/crypto_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use arrow::{
};
use blake2::{Blake2b512, Blake2s256, Digest};
use blake3::Hasher as Blake3;
use datafusion_common::ScalarValue;
use datafusion_common::{
cast::{as_binary_array, as_generic_binary_array, as_generic_string_array},
plan_err,
};
use datafusion_common::{exec_err, ScalarValue};
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use md5::Md5;
Expand Down Expand Up @@ -66,7 +66,7 @@ fn digest_process(
DataType::LargeBinary => {
digest_algorithm.digest_binary_array::<i64>(a.as_ref())
}
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function {digest_algorithm}"
),
},
Expand All @@ -77,7 +77,7 @@ fn digest_process(
}
ScalarValue::Binary(a) | ScalarValue::LargeBinary(a) => Ok(digest_algorithm
.digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
other => internal_err!(
other => exec_err!(
"Unsupported data type {other:?} for function {digest_algorithm}"
),
},
Expand Down Expand Up @@ -238,7 +238,7 @@ macro_rules! define_digest_function {
#[doc = $DOC]
pub fn $NAME(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 1 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but {} takes exactly one argument",
args.len(),
DigestAlgorithm::$METHOD.to_string()
Expand All @@ -264,7 +264,7 @@ fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
/// computes md5 hash digest of the given input
pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 1 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but {} takes exactly one argument",
args.len(),
DigestAlgorithm::Md5
Expand All @@ -284,7 +284,7 @@ pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
ColumnarValue::Scalar(ScalarValue::Utf8(opt.map(hex_encode::<_>)))
}
_ => return internal_err!("Impossibly got invalid results from digest"),
_ => return exec_err!("Impossibly got invalid results from digest"),
})
}

Expand Down Expand Up @@ -329,7 +329,7 @@ define_digest_function!(
/// Standard algorithms are md5, sha1, sha224, sha256, sha384 and sha512.
pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return internal_err!(
return exec_err!(
"{:?} args were supplied but digest takes exactly two arguments",
args.len()
);
Expand All @@ -339,7 +339,7 @@ pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
ScalarValue::Utf8(Some(method)) | ScalarValue::LargeUtf8(Some(method)) => {
method.parse::<DigestAlgorithm>()
}
other => internal_err!("Unsupported data type {other:?} for function digest"),
other => exec_err!("Unsupported data type {other:?} for function digest"),
},
ColumnarValue::Array(_) => {
internal_err!("Digest using dynamically decided method is not yet supported")
Expand Down
Loading
Loading