Skip to content

Commit

Permalink
fix merge
Browse files Browse the repository at this point in the history
  • Loading branch information
adriangb committed Dec 16, 2024
1 parent 943c384 commit fda2ec7
Showing 1 changed file with 14 additions and 145 deletions.
159 changes: 14 additions & 145 deletions datafusion/physical-optimizer/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1607,138 +1607,11 @@ fn build_statistics_expr(
Ok(statistics_expr)
}

<<<<<<< Updated upstream
/// Wrap the statistics expression in a case expression.
/// This is necessary to handle the case where the column is known
/// to be all nulls.
=======
/// Convert `column LIKE literal` where P is a constant prefix of the literal
/// to a range check on the column: `P <= column && column < P'`, where P' is the
/// lowest string after all P* strings.
fn build_like_match(
expr_builder: &mut PruningExpressionBuilder,
) -> Option<Arc<dyn PhysicalExpr>> {
// column LIKE literal => (min, max) LIKE literal split at % => min <= split literal && split literal <= max
// column LIKE 'foo%' => min <= 'foo' && 'foo' <= max
// column LIKE '%foo' => min <= '' && '' <= max => true
// column LIKE '%foo%' => min <= '' && '' <= max => true
// column LIKE 'foo' => min <= 'foo' && 'foo' <= max

fn unpack_string(s: &ScalarValue) -> Option<&String> {
match s {
ScalarValue::Utf8(Some(s)) => Some(s),
ScalarValue::LargeUtf8(Some(s)) => Some(s),
ScalarValue::Utf8View(Some(s)) => Some(s),
ScalarValue::Dictionary(_, value) => unpack_string(value),
_ => None,
}
}

fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&String> {
if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
let s = unpack_string(lit.value())?;
return Some(s);
}
None
}

// TODO Handle ILIKE perhaps by making the min lowercase and max uppercase
// this may involve building the physical expressions that call lower() and upper()
let min_column_expr = expr_builder.min_column_expr().ok()?;
let max_column_expr = expr_builder.max_column_expr().ok()?;
let scalar_expr = expr_builder.scalar_expr();
// check that the scalar is a string literal
let s = extract_string_literal(scalar_expr)?;
// ANSI SQL specifies two wildcards: % and _. % matches zero or more characters, _ matches exactly one character.
let first_wildcard_index = s.find(['%', '_']);
if first_wildcard_index == Some(0) {
// there's no filtering we could possibly do, return an error and have this be handled by the unhandled hook
return None;
}
let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
let prefix = &s[..wildcard_index];
let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
prefix.to_string(),
))));
let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
increment_utf8(prefix)?,
))));
(lower_bound_lit, upper_bound_lit)
} else {
// the like expression is a literal and can be converted into a comparison
let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(s.clone()))));
(bound.clone(), bound)
};
let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
lower_bound,
Operator::LtEq,
max_column_expr.clone(),
));
let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
min_column_expr.clone(),
Operator::LtEq,
upper_bound,
));
let combined = Arc::new(phys_expr::BinaryExpr::new(
upper_bound_expr,
Operator::And,
lower_bound_expr,
));
Some(combined)
}

/// Increment a UTF8 string by one, returning `None` if it can't be incremented.
/// This makes it so that the returned string will always compare greater than the input string
/// or any other string with the same prefix.
/// This is necessary since the statistics may have been truncated: if we have a min statistic
/// of "fo" that may have originally been "foz" or anything else with the prefix "fo".
/// E.g. `increment_utf8("foo") >= "foo"` and `increment_utf8("foo") >= "fooz"`
/// In this example `increment_utf8("foo") == "fop"
fn increment_utf8(data: &str) -> Option<String> {
// Helper function to check if a character is valid to use
fn is_valid_unicode(c: char) -> bool {
let cp = c as u32;

// Filter out non-characters (https://www.unicode.org/versions/corrigendum9.html)
if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
return false;
}

// Filter out private use area
if cp >= 0x110000 {
return false;
}

true
}

// Convert string to vector of code points
let mut code_points: Vec<char> = data.chars().collect();

// Work backwards through code points
for idx in (0..code_points.len()).rev() {
let original = code_points[idx] as u32;

// Try incrementing the code point
if let Some(next_char) = char::from_u32(original + 1) {
if is_valid_unicode(next_char) {
code_points[idx] = next_char;
// truncate the string to the current index
code_points.truncate(idx + 1);
return Some(code_points.into_iter().collect());
}
}
}

None
}

/// Wrap the statistics expression in a check that skips the expression if the column is all nulls.
/// This is important not only as an optimization but also because statistics may not be
/// accurate for columns that are all nulls.
/// For example, for an `int` column `x` with all nulls, the min/max/null_count statistics
/// might be set to 0 and evaluating `x = 0` would incorrectly include the column.
>>>>>>> Stashed changes
///
/// For example:
///
Expand Down Expand Up @@ -1767,15 +1640,11 @@ fn wrap_null_count_check_expr(
let not_when_null_count_eq_row_count = phys_expr::not(when_null_count_eq_row_count)?;

// NOT (x_null_count = x_row_count) AND (<statistics_expr>)
Ok(
Arc::new(
phys_expr::BinaryExpr::new(
not_when_null_count_eq_row_count,
Operator::And,
statistics_expr,
)
)
)
Ok(Arc::new(phys_expr::BinaryExpr::new(
not_when_null_count_eq_row_count,
Operator::And,
statistics_expr,
)))
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -2364,7 +2233,8 @@ mod tests {
#[test]
fn row_group_predicate_eq() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "NOT c1_null_count@2 = c1_row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
let expected_expr =
"NOT c1_null_count@2 = c1_row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";

// test column on the left
let expr = col("c1").eq(lit(1));
Expand All @@ -2384,7 +2254,8 @@ mod tests {
#[test]
fn row_group_predicate_not_eq() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "NOT c1_null_count@2 = c1_row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
let expected_expr =
"NOT c1_null_count@2 = c1_row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";

// test column on the left
let expr = col("c1").not_eq(lit(1));
Expand All @@ -2404,8 +2275,7 @@ mod tests {
#[test]
fn row_group_predicate_gt() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr =
"NOT c1_null_count@1 = c1_row_count@2 AND c1_max@0 > 1";
let expected_expr = "NOT c1_null_count@1 = c1_row_count@2 AND c1_max@0 > 1";

// test column on the left
let expr = col("c1").gt(lit(1));
Expand Down Expand Up @@ -2444,8 +2314,7 @@ mod tests {
#[test]
fn row_group_predicate_lt() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr =
"NOT c1_null_count@1 = c1_row_count@2 AND c1_min@0 < 1";
let expected_expr = "NOT c1_null_count@1 = c1_row_count@2 AND c1_min@0 < 1";

// test column on the left
let expr = col("c1").lt(lit(1));
Expand Down Expand Up @@ -2490,8 +2359,7 @@ mod tests {
]);
// test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
let expected_expr =
"NOT c1_null_count@1 = c1_row_count@2 AND c1_min@0 < 1";
let expected_expr = "NOT c1_null_count@1 = c1_row_count@2 AND c1_min@0 < 1";
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);
Expand Down Expand Up @@ -2800,7 +2668,8 @@ mod tests {
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);

let expected_expr = "NOT c1_null_count@1 = c1_row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
let expected_expr =
"NOT c1_null_count@1 = c1_row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";

// test column on the left
let expr =
Expand Down

0 comments on commit fda2ec7

Please sign in to comment.