From 5ca2ef89308e24da4605551b9a88aad41ddafba4 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Sat, 14 Dec 2024 00:55:46 +0800 Subject: [PATCH] [WIP] Fix filter pushdown with schema evolution --- .../apache/paimon/casting/CastExecutors.java | 119 +++++++++++ .../operation/AppendOnlyFileStoreScan.java | 4 +- .../operation/KeyValueFileStoreScan.java | 2 +- .../paimon/schema/SchemaEvolutionUtil.java | 48 ++--- .../paimon/stats/SimpleStatsEvolutions.java | 21 +- .../paimon/utils/FormatReaderMapping.java | 2 +- .../schema/SchemaEvolutionUtilTest.java | 4 +- paimon-flink/paimon-flink-common/pom.xml | 4 + .../FilterPushdownWithSchemaChangeITCase.java | 197 ++++++++++++++++++ 9 files changed, 361 insertions(+), 40 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java index 8134e0118bf8c..892af6ad7a53f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java @@ -18,17 +18,32 @@ package org.apache.paimon.casting; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.GreaterOrEqual; +import org.apache.paimon.predicate.GreaterThan; +import org.apache.paimon.predicate.In; +import org.apache.paimon.predicate.LeafFunction; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LessOrEqual; +import org.apache.paimon.predicate.LessThan; +import org.apache.paimon.predicate.NotEqual; +import org.apache.paimon.predicate.NotIn; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeFamily; import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DecimalType; import javax.annotation.Nullable; +import java.math.BigDecimal; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import java.util.stream.Stream; /** Cast executors for input type and output type. */ @@ -100,6 +115,110 @@ public class CastExecutors { return IDENTITY_CAST_EXECUTOR; } + @Nullable + public static List castPredicateLiteralsEvolutionSafely( + LeafPredicate predicate, DataType outputType) { + DataType inputType = predicate.type(); + if (inputType.equalsIgnoreNullable(outputType)) { + return predicate.literals(); + } + + List literals = predicate.literals(); + + CastRule castRule = INSTANCE.internalResolve(inputType, outputType); + if (castRule == null) { + return literals; + } + + if (castRule instanceof DecimalToDecimalCastRule) { + if (((DecimalType) inputType).getPrecision() < ((DecimalType) outputType).getPrecision() + && containsEqualCheck(predicate)) { + return null; + } + return literals; + } else if (castRule instanceof NumericPrimitiveToDecimalCastRule) { + if (inputType.is(DataTypeFamily.INTEGER_NUMERIC) && containsEqualCheck(predicate)) { + return null; + } + return literals.stream() + .map(literal -> (Number) literal) + .map( + literal -> + inputType.is(DataTypeFamily.INTEGER_NUMERIC) + ? BigDecimal.valueOf(literal.longValue()) + : BigDecimal.valueOf(literal.doubleValue())) + .map(bd -> Decimal.fromBigDecimal(bd, bd.precision(), bd.scale())) + .collect(Collectors.toList()); + } else if (castRule instanceof DecimalToNumericPrimitiveCastRule) { + if (outputType.is(DataTypeFamily.INTEGER_NUMERIC) + && (containsPartialCheck(predicate) || containsNotEqualCheck(predicate))) { + // TODO: scale ? + return null; + } else if (containsEqualCheck(predicate)) { + return null; + } + + CastExecutor castExecutor = + (CastExecutor) castRule.create(inputType, outputType); + return literals.stream().map(castExecutor::cast).collect(Collectors.toList()); + } else if (castRule instanceof NumericPrimitiveCastRule) { + if (inputType.is(DataTypeFamily.INTEGER_NUMERIC) + && outputType.is(DataTypeFamily.INTEGER_NUMERIC)) { + if (integerScaleLargerThan(inputType.getTypeRoot(), outputType.getTypeRoot())) { + return literals; + } else { + // the data may be overflow, don't cast the predicate to avoid complexity + return null; + } + } + + // otherwise, the data may be overflow, don't cast the predicate to avoid complexity + return null; + } else if (castRule instanceof NumericToStringCastRule) { + // the data may be overflow, don't cast the predicate to avoid complexity + return null; + } else if (castRule instanceof StringToDecimalCastRule) { + // dangerous + return null; + } else if (castRule instanceof StringToNumericPrimitiveCastRule) { + return null; + } + + // TODO comment + CastExecutor castExecutor = + (CastExecutor) castRule.create(inputType, outputType); + return literals.stream() + .map(l -> castExecutor == null ? l : castExecutor.cast(l)) + .collect(Collectors.toList()); + } + + private static boolean containsEqualCheck(LeafPredicate predicate) { + LeafFunction function = predicate.function(); + return function instanceof In + || function instanceof Equal + || function instanceof GreaterOrEqual + || function instanceof LessOrEqual; + } + + private static boolean containsPartialCheck(LeafPredicate predicate) { + LeafFunction function = predicate.function(); + return function instanceof LessThan + || function instanceof LessOrEqual + || function instanceof GreaterThan + || function instanceof GreaterOrEqual; + } + + private static boolean containsNotEqualCheck(LeafPredicate predicate) { + LeafFunction function = predicate.function(); + return function instanceof NotIn || function instanceof NotEqual; + } + + private static boolean integerScaleLargerThan(DataTypeRoot a, DataTypeRoot b) { + return (a == DataTypeRoot.SMALLINT && b == DataTypeRoot.TINYINT) + || (a == DataTypeRoot.INTEGER && b != DataTypeRoot.BIGINT) + || a == DataTypeRoot.BIGINT; + } + // Map> private final Map>> rules = new HashMap<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index d2ca5da42249a..88e6bbadf5904 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -109,7 +109,9 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry Predicate dataPredicate = dataFilterMapping.computeIfAbsent( entry.file().schemaId(), - id -> simpleStatsEvolutions.convertFilter(entry.file().schemaId(), filter)); + id -> + simpleStatsEvolutions.tryEvolveFilter( + entry.file().schemaId(), filter)); try (FileIndexPredicate predicate = new FileIndexPredicate(embeddedIndexBytes, dataRowType)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 8d8c51996cfe9..e838612f3cd77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -158,7 +158,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE schemaId2DataFilter.computeIfAbsent( entry.file().schemaId(), id -> - fieldValueStatsConverters.convertFilter( + fieldValueStatsConverters.tryEvolveFilter( entry.file().schemaId(), valueFilter)); return predicate.evaluate(dataPredicate).remain(); } catch (IOException e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index 0ae2798c29e00..11ba161a70683 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -276,8 +276,9 @@ public static int[][] createDataProjection( } /** - * Create predicate list from data fields. We will visit all predicate in filters, reset it's - * field index, name and type, and ignore predicate if the field is not exist. + * Evolve data field filters because of schema evolution. We will visit all predicate in + * filters, reset its field index, name and type, and ignore predicate if the field is not + * exist. * * @param tableFields the table fields * @param dataFields the underlying data fields @@ -285,7 +286,7 @@ public static int[][] createDataProjection( * @return the data filters */ @Nullable - public static List createDataFilters( + public static List evolveDataFilters( List tableFields, List dataFields, List filters) { if (filters == null) { return null; @@ -308,29 +309,24 @@ public static List createDataFilters( return Optional.empty(); } - DataType dataValueType = dataField.type().copy(true); - DataType predicateType = predicate.type().copy(true); - CastExecutor castExecutor = - dataValueType.equals(predicateType) - ? null - : (CastExecutor) - CastExecutors.resolve( - predicate.type(), dataField.type()); - // Convert value from predicate type to underlying data type which may lose - // information, for example, convert double value to int. But it doesn't matter - // because it just for predicate push down and the data will be filtered - // correctly after reading. - List literals = - predicate.literals().stream() - .map(v -> castExecutor == null ? v : castExecutor.cast(v)) - .collect(Collectors.toList()); - return Optional.of( - new LeafPredicate( - predicate.function(), - dataField.type(), - indexOf(dataField, idToDataFields), - dataField.name(), - literals)); + DataType dataValueType = dataField.type(); + DataType predicateType = predicate.type(); + + List evolvedLiterals = + dataValueType.equalsIgnoreNullable(predicateType) + ? predicate.literals() + : CastExecutors.castPredicateLiteralsEvolutionSafely( + predicate, dataValueType); + + return evolvedLiterals == null + ? Optional.empty() + : Optional.of( + new LeafPredicate( + predicate.function(), + dataField.type(), + indexOf(dataField, idToDataFields), + dataField.name(), + evolvedLiterals)); }; for (Predicate predicate : filters) { diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java index a0814b8c04c41..1b4313d27e2fd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java @@ -77,15 +77,18 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) { }); } - public Predicate convertFilter(long dataSchemaId, Predicate filter) { - return tableSchemaId == dataSchemaId - ? filter - : Objects.requireNonNull( - SchemaEvolutionUtil.createDataFilters( - schemaFields.apply(tableSchemaId), - schemaFields.apply(dataSchemaId), - Collections.singletonList(filter))) - .get(0); + @Nullable + public Predicate tryEvolveFilter(long dataSchemaId, Predicate filter) { + if (tableSchemaId == dataSchemaId) { + return filter; + } + List evolved = + Objects.requireNonNull( + SchemaEvolutionUtil.evolveDataFilters( + schemaFields.apply(tableSchemaId), + schemaFields.apply(dataSchemaId), + Collections.singletonList(filter))); + return evolved.isEmpty() ? null : evolved.get(0); } public List tableDataFields() { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java index f6c6287f51b40..8ebc68d2413fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java @@ -290,7 +290,7 @@ private List readFilters( List dataFilters = tableSchema.id() == dataSchema.id() ? filters - : SchemaEvolutionUtil.createDataFilters( + : SchemaEvolutionUtil.evolveDataFilters( tableSchema.fields(), dataSchema.fields(), filters); // Skip pushing down partition filters to reader. diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java index 9d947f76d9951..dfa1038d6f636 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java @@ -278,7 +278,7 @@ public void testCreateDataFilters() { IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList())); List filters = - SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates); + SchemaEvolutionUtil.evolveDataFilters(tableFields2, dataFields, predicates); assert filters != null; assertThat(filters.size()).isEqualTo(1); @@ -300,7 +300,7 @@ public void testColumnTypeFilter() { "c", Collections.singletonList(1.0D))); List filters = - SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates); + SchemaEvolutionUtil.evolveDataFilters(tableFields2, dataFields, predicates); assert filters != null; assertThat(filters.size()).isEqualTo(1); diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index 91222983bf6ba..1b81cc0da0810 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -167,6 +167,10 @@ under the License. parquet-avro org.apache.parquet + + org.apache.orc + orc-core + diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java new file mode 100644 index 0000000000000..b0c6d5a2e3cf4 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; + +import static org.assertj.core.api.Assertions.assertThat; + +/** TODO all file format tests */ +public class FilterPushdownWithSchemaChangeITCase extends CatalogITCaseBase { + + @Test + public void testDecimalToDecimal() { + // to higher precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f DECIMAL(5, 2)" + + ") with (" + + " 'file.format' = 'orc'" + + ")"); + sql("INSERT INTO T VALUES (1, 111.32)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(6, 3))"); + assertThat(sql("SELECT * FROM T WHERE f < 111.321")) + .containsExactly(Row.of(1, new BigDecimal("111.320"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.321")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111.320")) + .containsExactly(Row.of(1, new BigDecimal("111.320"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.321")) + .containsExactly(Row.of(1, new BigDecimal("111.320"))); + + sql("DROP TABLE T"); + + // to lower precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f DECIMAL(6, 3)" + + ") with (" + + " 'file.format' = 'orc'" + + ")"); + sql("INSERT INTO T VALUES (1, 111.321), (2, 111.331)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(5, 2))"); + assertThat(sql("SELECT * FROM T WHERE f > 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.32")) + .containsExactly(Row.of(1, new BigDecimal("111.32"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + } + + @Test + public void testNumericPrimitiveToDecimal() { + String ddl = + "CREATE TABLE T (" + + " id INT," + + " f DECIMAL(5, 2)" + + ") with (" + + " 'file.format' = 'orc'" + + ");"; + + // to higher precision + sql(ddl); + sql("INSERT INTO T VALUES (1, 111.32)"); + sql("ALTER TABLE T MODIFY (f DOUBLE)"); + assertThat(sql("SELECT * FROM T WHERE f < 111.321")).containsExactly(Row.of(1, 111.32)); + assertThat(sql("SELECT * FROM T WHERE f = 111.321")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111.320")).containsExactly(Row.of(1, 111.32)); + assertThat(sql("SELECT * FROM T WHERE f <> 111.321")).containsExactly(Row.of(1, 111.32)); + + sql("DROP TABLE T"); + + // to lower precision + sql(ddl); + sql("INSERT INTO T VALUES (1, 111.32), (2, 112.33)"); + sql("ALTER TABLE T MODIFY (f INT)"); + assertThat(sql("SELECT * FROM T WHERE f < 112")).containsExactly(Row.of(1, 111)); + assertThat(sql("SELECT * FROM T WHERE f > 112")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111")).containsExactly(Row.of(1, 111)); + assertThat(sql("SELECT * FROM T WHERE f <> 111")).containsExactly(Row.of(2, 112)); + } + + @Test + public void testDecimalToNumericPrimitive() { + // to higher precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f INT" + + ") with (" + + " 'file.format' = 'orc'" + + ");"); + sql("INSERT INTO T VALUES (1, 111)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(5, 2))"); + assertThat(sql("SELECT * FROM T WHERE f < 111.01")) + .containsExactly(Row.of(1, new BigDecimal("111.00"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.01")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111.00")) + .containsExactly(Row.of(1, new BigDecimal("111.00"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.01")) + .containsExactly(Row.of(1, new BigDecimal("111.00"))); + + sql("DROP TABLE T"); + + // to lower precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f DOUBLE" + + ") with (" + + " 'file.format' = 'orc'" + + ");"); + sql("INSERT INTO T VALUES (1, 111.321), (2, 111.331)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(5, 2))"); + assertThat(sql("SELECT * FROM T WHERE f > 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.32")) + .containsExactly(Row.of(1, new BigDecimal("111.32"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + } + + @Test + public void testNumericPrimitive() { + // no checks for high scale to low scale because we don't handle it + + // integer to higher scale integer + sql( + "CREATE TABLE T (" + + " id INT," + + " f TINYINT" + + ") with (" + + " 'file.format' = 'orc'" + + ");"); + sql("INSERT INTO T VALUES (1, CAST (127 AS TINYINT))"); + sql("ALTER TABLE T MODIFY (f INT)"); + // (byte) 383 == 127 + assertThat(sql("SELECT * FROM T WHERE f < 128")).containsExactly(Row.of(1, 127)); + assertThat(sql("SELECT * FROM T WHERE f < 383")).containsExactly(Row.of(1, 127)); + assertThat(sql("SELECT * FROM T WHERE f = 127")).containsExactly(Row.of(1, 127)); + assertThat(sql("SELECT * FROM T WHERE f = 383")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f <> 127")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f <> 383")).containsExactly(Row.of(1, 127)); + } + + @Test + public void testNumericToString() { + // no more tests because we don't pushdown filter + sql( + "CREATE TABLE T (" + + " id INT," + + " f STRING" + + ") with (" + + " 'file.format' = 'orc'" + + ");"); + sql("INSERT INTO T VALUES (1, '1'), (2, '111')"); + sql("ALTER TABLE T MODIFY (f INT)"); + assertThat(sql("SELECT * FROM T WHERE f > 2")).containsExactly(Row.of(2, 111)); + assertThat(sql("SELECT * FROM T WHERE f = 1")).containsExactly(Row.of(1, 1)); + assertThat(sql("SELECT * FROM T WHERE f <> 1")).containsExactly(Row.of(2, 111)); + } + + @Test + public void testStringToDecimal() { + // integer + sql( + "CREATE TABLE T (" + + " id INT," + + " f DECIMAL(5, 2)" + + ") with (" + + " 'file.format' = 'orc'" + + ");"); + sql("INSERT INTO T VALUES (1, '111.32'), (2, '222.321')"); + sql("ALTER TABLE T MODIFY (f STRING)"); + + assertThat(sql("SELECT * FROM T WHERE f < '2'")).isEmpty(); // you know + } +}