Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: extract strings file to strings_func like in spark grouping #1215

Merged
merged 5 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ mod bitwise_not;
pub use bitwise_not::{bitwise_not, BitwiseNotExpr};
mod checkoverflow;
pub use checkoverflow::CheckOverflow;
mod strings;
pub use strings::{Contains, EndsWith, Like, StartsWith, StringSpaceExpr, SubstringExpr};

mod kernels;
mod list;
mod regexp;
Expand All @@ -54,6 +53,8 @@ pub use normalize_nan::NormalizeNaNAndZero;

mod agg_funcs;
mod comet_scalar_funcs;
mod string_funcs;

mod datetime_funcs;
pub use agg_funcs::*;

Expand All @@ -66,6 +67,7 @@ pub use error::{SparkError, SparkResult};
pub use if_expr::IfExpr;
pub use list::{ArrayInsert, GetArrayStructFields, ListExtract};
pub use regexp::RLike;
pub use string_funcs::*;
pub use struct_funcs::*;
pub use to_json::ToJson;

Expand Down
24 changes: 24 additions & 0 deletions native/spark-expr/src/string_funcs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod prediction;
mod string_space;
mod substring;

pub use prediction::*;
pub use string_space::StringSpaceExpr;
pub use substring::SubstringExpr;
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#![allow(deprecated)]

use crate::kernels::strings::{string_space, substring};
use arrow::{
compute::{
contains_dyn, contains_utf8_scalar_dyn, ends_with_dyn, ends_with_utf8_scalar_dyn, like_dyn,
Expand Down Expand Up @@ -136,155 +135,3 @@ make_predicate_function!(StartsWith, starts_with_dyn, starts_with_utf8_scalar_dy
make_predicate_function!(EndsWith, ends_with_dyn, ends_with_utf8_scalar_dyn);

make_predicate_function!(Contains, contains_dyn, contains_utf8_scalar_dyn);

#[derive(Debug, Eq)]
pub struct SubstringExpr {
pub child: Arc<dyn PhysicalExpr>,
pub start: i64,
pub len: u64,
}

impl Hash for SubstringExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.child.hash(state);
self.start.hash(state);
self.len.hash(state);
}
}

impl PartialEq for SubstringExpr {
fn eq(&self, other: &Self) -> bool {
self.child.eq(&other.child) && self.start.eq(&other.start) && self.len.eq(&other.len)
}
}
#[derive(Debug, Eq)]
pub struct StringSpaceExpr {
pub child: Arc<dyn PhysicalExpr>,
}

impl Hash for StringSpaceExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.child.hash(state);
}
}

impl PartialEq for StringSpaceExpr {
fn eq(&self, other: &Self) -> bool {
self.child.eq(&other.child)
}
}

impl SubstringExpr {
pub fn new(child: Arc<dyn PhysicalExpr>, start: i64, len: u64) -> Self {
Self { child, start, len }
}
}

impl StringSpaceExpr {
pub fn new(child: Arc<dyn PhysicalExpr>) -> Self {
Self { child }
}
}

impl Display for SubstringExpr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"StringSpace [start: {}, len: {}, child: {}]",
self.start, self.len, self.child
)
}
}

impl Display for StringSpaceExpr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "StringSpace [child: {}] ", self.child)
}
}

impl PhysicalExpr for SubstringExpr {
fn as_any(&self) -> &dyn Any {
self
}

fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result<DataType> {
self.child.data_type(input_schema)
}

fn nullable(&self, _: &Schema) -> datafusion_common::Result<bool> {
Ok(true)
}

fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result<ColumnarValue> {
let arg = self.child.evaluate(batch)?;
match arg {
ColumnarValue::Array(array) => {
let result = substring(&array, self.start, self.len)?;

Ok(ColumnarValue::Array(result))
}
_ => Err(DataFusionError::Execution(
"Substring(scalar) should be fold in Spark JVM side.".to_string(),
)),
}
}

fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(SubstringExpr::new(
Arc::clone(&children[0]),
self.start,
self.len,
)))
}
}

impl PhysicalExpr for StringSpaceExpr {
fn as_any(&self) -> &dyn Any {
self
}

fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result<DataType> {
match self.child.data_type(input_schema)? {
DataType::Dictionary(key_type, _) => {
Ok(DataType::Dictionary(key_type, Box::new(DataType::Utf8)))
}
_ => Ok(DataType::Utf8),
}
}

fn nullable(&self, _: &Schema) -> datafusion_common::Result<bool> {
Ok(true)
}

fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result<ColumnarValue> {
let arg = self.child.evaluate(batch)?;
match arg {
ColumnarValue::Array(array) => {
let result = string_space(&array)?;

Ok(ColumnarValue::Array(result))
}
_ => Err(DataFusionError::Execution(
"StringSpace(scalar) should be fold in Spark JVM side.".to_string(),
)),
}
}

fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(StringSpaceExpr::new(Arc::clone(&children[0]))))
}
}
104 changes: 104 additions & 0 deletions native/spark-expr/src/string_funcs/string_space.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#![allow(deprecated)]

use crate::kernels::strings::string_space;
use arrow::record_batch::RecordBatch;
use arrow_schema::{DataType, Schema};
use datafusion::logical_expr::ColumnarValue;
use datafusion_common::DataFusionError;
use datafusion_physical_expr::PhysicalExpr;
use std::{
any::Any,
fmt::{Display, Formatter},
hash::Hash,
sync::Arc,
};
andygrove marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Debug, Eq)]
pub struct StringSpaceExpr {
pub child: Arc<dyn PhysicalExpr>,
}

impl Hash for StringSpaceExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.child.hash(state);
}
}

impl PartialEq for StringSpaceExpr {
fn eq(&self, other: &Self) -> bool {
self.child.eq(&other.child)
}
}

impl StringSpaceExpr {
pub fn new(child: Arc<dyn PhysicalExpr>) -> Self {
Self { child }
}
}

impl Display for StringSpaceExpr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "StringSpace [child: {}] ", self.child)
}
}

impl PhysicalExpr for StringSpaceExpr {
fn as_any(&self) -> &dyn Any {
self
}

fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result<DataType> {
match self.child.data_type(input_schema)? {
DataType::Dictionary(key_type, _) => {
Ok(DataType::Dictionary(key_type, Box::new(DataType::Utf8)))
}
_ => Ok(DataType::Utf8),
}
}

fn nullable(&self, _: &Schema) -> datafusion_common::Result<bool> {
Ok(true)
}

fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result<ColumnarValue> {
let arg = self.child.evaluate(batch)?;
match arg {
ColumnarValue::Array(array) => {
let result = string_space(&array)?;

Ok(ColumnarValue::Array(result))
}
_ => Err(DataFusionError::Execution(
"StringSpace(scalar) should be fold in Spark JVM side.".to_string(),
)),
}
}

fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(StringSpaceExpr::new(Arc::clone(&children[0]))))
}
}
Loading
Loading