diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 5bffb9ee1c..c7c54a4e9f 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -27,8 +27,7 @@ mod bitwise_not; pub use bitwise_not::{bitwise_not, BitwiseNotExpr}; mod checkoverflow; pub use checkoverflow::CheckOverflow; -mod strings; -pub use strings::{Contains, EndsWith, Like, StartsWith, StringSpaceExpr, SubstringExpr}; + mod kernels; mod list; mod regexp; @@ -54,6 +53,8 @@ pub use normalize_nan::NormalizeNaNAndZero; mod agg_funcs; mod comet_scalar_funcs; +mod string_funcs; + mod datetime_funcs; pub use agg_funcs::*; @@ -66,6 +67,7 @@ pub use error::{SparkError, SparkResult}; pub use if_expr::IfExpr; pub use list::{ArrayInsert, GetArrayStructFields, ListExtract}; pub use regexp::RLike; +pub use string_funcs::*; pub use struct_funcs::*; pub use to_json::ToJson; diff --git a/native/spark-expr/src/string_funcs/mod.rs b/native/spark-expr/src/string_funcs/mod.rs new file mode 100644 index 0000000000..2c2a5b37c7 --- /dev/null +++ b/native/spark-expr/src/string_funcs/mod.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod prediction; +mod string_space; +mod substring; + +pub use prediction::*; +pub use string_space::StringSpaceExpr; +pub use substring::SubstringExpr; diff --git a/native/spark-expr/src/strings.rs b/native/spark-expr/src/string_funcs/prediction.rs similarity index 54% rename from native/spark-expr/src/strings.rs rename to native/spark-expr/src/string_funcs/prediction.rs index c2706b5896..d2ef82fcbe 100644 --- a/native/spark-expr/src/strings.rs +++ b/native/spark-expr/src/string_funcs/prediction.rs @@ -17,7 +17,6 @@ #![allow(deprecated)] -use crate::kernels::strings::{string_space, substring}; use arrow::{ compute::{ contains_dyn, contains_utf8_scalar_dyn, ends_with_dyn, ends_with_utf8_scalar_dyn, like_dyn, @@ -136,155 +135,3 @@ make_predicate_function!(StartsWith, starts_with_dyn, starts_with_utf8_scalar_dy make_predicate_function!(EndsWith, ends_with_dyn, ends_with_utf8_scalar_dyn); make_predicate_function!(Contains, contains_dyn, contains_utf8_scalar_dyn); - -#[derive(Debug, Eq)] -pub struct SubstringExpr { - pub child: Arc, - pub start: i64, - pub len: u64, -} - -impl Hash for SubstringExpr { - fn hash(&self, state: &mut H) { - self.child.hash(state); - self.start.hash(state); - self.len.hash(state); - } -} - -impl PartialEq for SubstringExpr { - fn eq(&self, other: &Self) -> bool { - self.child.eq(&other.child) && self.start.eq(&other.start) && self.len.eq(&other.len) - } -} -#[derive(Debug, Eq)] -pub struct StringSpaceExpr { - pub child: Arc, -} - -impl Hash for StringSpaceExpr { - fn hash(&self, state: &mut H) { - self.child.hash(state); - } -} - -impl PartialEq for StringSpaceExpr { - fn eq(&self, other: &Self) -> bool { - self.child.eq(&other.child) - } -} - -impl SubstringExpr { - pub fn new(child: Arc, start: i64, len: u64) -> Self { - Self { child, start, len } - } -} - -impl StringSpaceExpr { - pub fn new(child: Arc) -> Self { - Self { child } - } -} - -impl Display for SubstringExpr { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "StringSpace [start: {}, len: {}, child: {}]", - self.start, self.len, self.child - ) - } -} - -impl Display for StringSpaceExpr { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "StringSpace [child: {}] ", self.child) - } -} - -impl PhysicalExpr for SubstringExpr { - fn as_any(&self) -> &dyn Any { - self - } - - fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result { - self.child.data_type(input_schema) - } - - fn nullable(&self, _: &Schema) -> datafusion_common::Result { - Ok(true) - } - - fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result { - let arg = self.child.evaluate(batch)?; - match arg { - ColumnarValue::Array(array) => { - let result = substring(&array, self.start, self.len)?; - - Ok(ColumnarValue::Array(result)) - } - _ => Err(DataFusionError::Execution( - "Substring(scalar) should be fold in Spark JVM side.".to_string(), - )), - } - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.child] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> datafusion_common::Result> { - Ok(Arc::new(SubstringExpr::new( - Arc::clone(&children[0]), - self.start, - self.len, - ))) - } -} - -impl PhysicalExpr for StringSpaceExpr { - fn as_any(&self) -> &dyn Any { - self - } - - fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result { - match self.child.data_type(input_schema)? { - DataType::Dictionary(key_type, _) => { - Ok(DataType::Dictionary(key_type, Box::new(DataType::Utf8))) - } - _ => Ok(DataType::Utf8), - } - } - - fn nullable(&self, _: &Schema) -> datafusion_common::Result { - Ok(true) - } - - fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result { - let arg = self.child.evaluate(batch)?; - match arg { - ColumnarValue::Array(array) => { - let result = string_space(&array)?; - - Ok(ColumnarValue::Array(result)) - } - _ => Err(DataFusionError::Execution( - "StringSpace(scalar) should be fold in Spark JVM side.".to_string(), - )), - } - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.child] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> datafusion_common::Result> { - Ok(Arc::new(StringSpaceExpr::new(Arc::clone(&children[0])))) - } -} diff --git a/native/spark-expr/src/string_funcs/string_space.rs b/native/spark-expr/src/string_funcs/string_space.rs new file mode 100644 index 0000000000..db70929057 --- /dev/null +++ b/native/spark-expr/src/string_funcs/string_space.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![allow(deprecated)] + +use crate::kernels::strings::string_space; +use arrow::record_batch::RecordBatch; +use arrow_schema::{DataType, Schema}; +use datafusion::logical_expr::ColumnarValue; +use datafusion_common::DataFusionError; +use datafusion_physical_expr::PhysicalExpr; +use std::{ + any::Any, + fmt::{Display, Formatter}, + hash::Hash, + sync::Arc, +}; + +#[derive(Debug, Eq)] +pub struct StringSpaceExpr { + pub child: Arc, +} + +impl Hash for StringSpaceExpr { + fn hash(&self, state: &mut H) { + self.child.hash(state); + } +} + +impl PartialEq for StringSpaceExpr { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) + } +} + +impl StringSpaceExpr { + pub fn new(child: Arc) -> Self { + Self { child } + } +} + +impl Display for StringSpaceExpr { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "StringSpace [child: {}] ", self.child) + } +} + +impl PhysicalExpr for StringSpaceExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result { + match self.child.data_type(input_schema)? { + DataType::Dictionary(key_type, _) => { + Ok(DataType::Dictionary(key_type, Box::new(DataType::Utf8))) + } + _ => Ok(DataType::Utf8), + } + } + + fn nullable(&self, _: &Schema) -> datafusion_common::Result { + Ok(true) + } + + fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result { + let arg = self.child.evaluate(batch)?; + match arg { + ColumnarValue::Array(array) => { + let result = string_space(&array)?; + + Ok(ColumnarValue::Array(result)) + } + _ => Err(DataFusionError::Execution( + "StringSpace(scalar) should be fold in Spark JVM side.".to_string(), + )), + } + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion_common::Result> { + Ok(Arc::new(StringSpaceExpr::new(Arc::clone(&children[0])))) + } +} diff --git a/native/spark-expr/src/string_funcs/substring.rs b/native/spark-expr/src/string_funcs/substring.rs new file mode 100644 index 0000000000..741ea9139d --- /dev/null +++ b/native/spark-expr/src/string_funcs/substring.rs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![allow(deprecated)] + +use crate::kernels::strings::substring; +use arrow::record_batch::RecordBatch; +use arrow_schema::{DataType, Schema}; +use datafusion::logical_expr::ColumnarValue; +use datafusion_common::DataFusionError; +use datafusion_physical_expr::PhysicalExpr; +use std::{ + any::Any, + fmt::{Display, Formatter}, + hash::Hash, + sync::Arc, +}; + +#[derive(Debug, Eq)] +pub struct SubstringExpr { + pub child: Arc, + pub start: i64, + pub len: u64, +} + +impl Hash for SubstringExpr { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.start.hash(state); + self.len.hash(state); + } +} + +impl PartialEq for SubstringExpr { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) && self.start.eq(&other.start) && self.len.eq(&other.len) + } +} + +impl SubstringExpr { + pub fn new(child: Arc, start: i64, len: u64) -> Self { + Self { child, start, len } + } +} + +impl Display for SubstringExpr { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "StringSpace [start: {}, len: {}, child: {}]", + self.start, self.len, self.child + ) + } +} + +impl PhysicalExpr for SubstringExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result { + self.child.data_type(input_schema) + } + + fn nullable(&self, _: &Schema) -> datafusion_common::Result { + Ok(true) + } + + fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result { + let arg = self.child.evaluate(batch)?; + match arg { + ColumnarValue::Array(array) => { + let result = substring(&array, self.start, self.len)?; + + Ok(ColumnarValue::Array(result)) + } + _ => Err(DataFusionError::Execution( + "Substring(scalar) should be fold in Spark JVM side.".to_string(), + )), + } + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion_common::Result> { + Ok(Arc::new(SubstringExpr::new( + Arc::clone(&children[0]), + self.start, + self.len, + ))) + } +}