Skip to content

Commit

Permalink
move string kernels
Browse files Browse the repository at this point in the history
  • Loading branch information
rluvaton committed Jan 5, 2025
1 parent ac4ce18 commit 67f054b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 67 deletions.
1 change: 0 additions & 1 deletion native/spark-expr/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,4 @@

//! Kernels
pub mod strings;
pub(crate) mod temporal;
63 changes: 0 additions & 63 deletions native/spark-expr/src/kernels/strings.rs

This file was deleted.

49 changes: 46 additions & 3 deletions native/spark-expr/src/string_funcs/substring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

#![allow(deprecated)]

use crate::kernels::strings::substring;
use arrow::record_batch::RecordBatch;
use arrow_schema::{DataType, Schema};
use datafusion::logical_expr::ColumnarValue;
use datafusion_common::DataFusionError;
use datafusion_physical_expr::PhysicalExpr;
Expand All @@ -29,6 +27,16 @@ use std::{
hash::Hash,
sync::Arc,
};
use arrow_array::{make_array, Array, ArrayRef, DictionaryArray, LargeStringArray, StringArray};
use arrow_array::cast::as_dictionary_array;

use arrow::{
array::*
,
compute::kernels::substring::{substring as arrow_substring, substring_by_char},
datatypes::{Schema, DataType, Int32Type},
};


#[derive(Debug, Eq)]
pub struct SubstringExpr {
Expand Down Expand Up @@ -84,7 +92,7 @@ impl PhysicalExpr for SubstringExpr {
let arg = self.child.evaluate(batch)?;
match arg {
ColumnarValue::Array(array) => {
let result = substring(&array, self.start, self.len)?;
let result = substring_kernel(&array, self.start, self.len)?;

Ok(ColumnarValue::Array(result))
}
Expand All @@ -110,3 +118,38 @@ impl PhysicalExpr for SubstringExpr {
}
}


pub fn substring_kernel(array: &dyn Array, start: i64, length: u64) -> Result<ArrayRef, DataFusionError> {
match array.data_type() {
DataType::LargeUtf8 => substring_by_char(
array
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("A large string is expected"),
start,
Some(length),
)
.map_err(|e| e.into())
.map(|t| make_array(t.into_data())),
DataType::Utf8 => substring_by_char(
array
.as_any()
.downcast_ref::<StringArray>()
.expect("A string is expected"),
start,
Some(length),
)
.map_err(|e| e.into())
.map(|t| make_array(t.into_data())),
DataType::Binary | DataType::LargeBinary => {
arrow_substring(array, start, Some(length)).map_err(|e| e.into())
}
DataType::Dictionary(_, _) => {
let dict = as_dictionary_array::<Int32Type>(array);
let values = substring_kernel(dict.values(), start, length)?;
let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
Ok(Arc::new(result))
}
dt => panic!("Unsupported input type for function 'substring': {:?}", dt),
}
}

0 comments on commit 67f054b

Please sign in to comment.