Skip to content

Commit

Permalink
feat: support filter pushdown for datafusion (apache#203)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Shiyan Xu <[email protected]>
  • Loading branch information
jonathanc-n and xushiyan authored Dec 8, 2024
1 parent b2c60b1 commit eb0f520
Show file tree
Hide file tree
Showing 13 changed files with 820 additions and 230 deletions.
3 changes: 3 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub enum CoreError {
#[error("Config error: {0}")]
Config(#[from] ConfigError),

#[error("Data type error: {0}")]
DataType(String),

#[error("File group error: {0}")]
FileGroup(String),

Expand Down
52 changes: 52 additions & 0 deletions crates/core/src/expr/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

use crate::error::CoreError;
use crate::expr::ExprOperator;
use crate::Result;
use std::str::FromStr;

#[derive(Debug, Clone)]
pub struct Filter {
pub field_name: String,
pub operator: ExprOperator,
pub field_value: String,
}

impl Filter {}

impl TryFrom<(&str, &str, &str)> for Filter {
type Error = CoreError;

fn try_from(binary_expr_tuple: (&str, &str, &str)) -> Result<Self, Self::Error> {
let (field_name, operator_str, field_value) = binary_expr_tuple;

let field_name = field_name.to_string();

let operator = ExprOperator::from_str(operator_str)?;

let field_value = field_value.to_string();

Ok(Filter {
field_name,
operator,
field_value,
})
}
}
118 changes: 118 additions & 0 deletions crates/core/src/expr/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

pub mod filter;

use crate::error::CoreError;
use crate::error::CoreError::Unsupported;

use std::cmp::PartialEq;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::str::FromStr;

/// An operator that represents a comparison operation used in a partition filter expression.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ExprOperator {
Eq,
Ne,
Lt,
Lte,
Gt,
Gte,
}

impl Display for ExprOperator {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
// Binary Operators
ExprOperator::Eq => write!(f, "="),
ExprOperator::Ne => write!(f, "!="),
ExprOperator::Lt => write!(f, "<"),
ExprOperator::Lte => write!(f, "<="),
ExprOperator::Gt => write!(f, ">"),
ExprOperator::Gte => write!(f, ">="),
}
}
}

impl ExprOperator {
pub const TOKEN_OP_PAIRS: [(&'static str, ExprOperator); 6] = [
("=", ExprOperator::Eq),
("!=", ExprOperator::Ne),
("<", ExprOperator::Lt),
("<=", ExprOperator::Lte),
(">", ExprOperator::Gt),
(">=", ExprOperator::Gte),
];

/// Negates the operator.
pub fn negate(&self) -> Option<ExprOperator> {
match self {
ExprOperator::Eq => Some(ExprOperator::Ne),
ExprOperator::Ne => Some(ExprOperator::Eq),
ExprOperator::Lt => Some(ExprOperator::Gte),
ExprOperator::Lte => Some(ExprOperator::Gt),
ExprOperator::Gt => Some(ExprOperator::Lte),
ExprOperator::Gte => Some(ExprOperator::Lt),
}
}
}

impl FromStr for ExprOperator {
type Err = CoreError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
ExprOperator::TOKEN_OP_PAIRS
.iter()
.find_map(|&(token, op)| {
if token.eq_ignore_ascii_case(s) {
Some(op)
} else {
None
}
})
.ok_or_else(|| Unsupported(format!("Unsupported operator: {}", s)))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_operator_from_str() {
assert_eq!(ExprOperator::from_str("=").unwrap(), ExprOperator::Eq);
assert_eq!(ExprOperator::from_str("!=").unwrap(), ExprOperator::Ne);
assert_eq!(ExprOperator::from_str("<").unwrap(), ExprOperator::Lt);
assert_eq!(ExprOperator::from_str("<=").unwrap(), ExprOperator::Lte);
assert_eq!(ExprOperator::from_str(">").unwrap(), ExprOperator::Gt);
assert_eq!(ExprOperator::from_str(">=").unwrap(), ExprOperator::Gte);
assert!(ExprOperator::from_str("??").is_err());
}

#[test]
fn test_operator_display() {
assert_eq!(ExprOperator::Eq.to_string(), "=");
assert_eq!(ExprOperator::Ne.to_string(), "!=");
assert_eq!(ExprOperator::Lt.to_string(), "<");
assert_eq!(ExprOperator::Lte.to_string(), "<=");
assert_eq!(ExprOperator::Gt.to_string(), ">");
assert_eq!(ExprOperator::Gte.to_string(), ">=");
}
}
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
pub mod config;
pub mod error;
pub mod expr;
pub mod file_group;
pub mod storage;
pub mod table;
Expand Down
8 changes: 7 additions & 1 deletion crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,12 @@ impl FileSystemView {
mod tests {
use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
use crate::expr::filter::Filter;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
use crate::table::Table;

use hudi_tests::TestTable;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
Expand Down Expand Up @@ -298,12 +300,16 @@ mod tests {
.await
.unwrap();
let partition_schema = hudi_table.get_partition_schema().await.unwrap();

let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
let filter_eq_300 = Filter::try_from(("shortField", "=", "300")).unwrap();
let partition_pruner = PartitionPruner::new(
&[("byteField", "<", "20"), ("shortField", "=", "300")],
&[filter_lt_20, filter_eq_300],
&partition_schema,
hudi_table.hudi_configs.as_ref(),
)
.unwrap();

let file_slices = fs_view
.get_file_slices_as_of("20240418173235694", &partition_pruner, excludes)
.await
Expand Down
Loading

0 comments on commit eb0f520

Please sign in to comment.