Skip to content

Commit

Permalink
[WIP] Fix filter pushdown with schema evolution
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Dec 16, 2024
1 parent 9179d65 commit 5ea5d09
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,32 @@

package org.apache.paimon.casting;

import org.apache.paimon.data.Decimal;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.GreaterOrEqual;
import org.apache.paimon.predicate.GreaterThan;
import org.apache.paimon.predicate.In;
import org.apache.paimon.predicate.LeafFunction;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LessOrEqual;
import org.apache.paimon.predicate.LessThan;
import org.apache.paimon.predicate.NotEqual;
import org.apache.paimon.predicate.NotIn;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DecimalType;

import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** Cast executors for input type and output type. */
Expand Down Expand Up @@ -100,6 +115,110 @@ public class CastExecutors {
return IDENTITY_CAST_EXECUTOR;
}

@Nullable
public static List<Object> castPredicateLiteralsEvolutionSafely(
LeafPredicate predicate, DataType outputType) {
DataType inputType = predicate.type();
if (inputType.equalsIgnoreNullable(outputType)) {
return predicate.literals();
}

List<Object> literals = predicate.literals();

CastRule<?, ?> castRule = INSTANCE.internalResolve(inputType, outputType);
if (castRule == null) {
return literals;
}

if (castRule instanceof DecimalToDecimalCastRule) {
if (((DecimalType) inputType).getPrecision() < ((DecimalType) outputType).getPrecision()
&& containsEqualCheck(predicate)) {
return null;
}
return literals;
} else if (castRule instanceof NumericPrimitiveToDecimalCastRule) {
if (inputType.is(DataTypeFamily.INTEGER_NUMERIC) && containsEqualCheck(predicate)) {
return null;
}
return literals.stream()
.map(literal -> (Number) literal)
.map(
literal ->
inputType.is(DataTypeFamily.INTEGER_NUMERIC)
? BigDecimal.valueOf(literal.longValue())
: BigDecimal.valueOf(literal.doubleValue()))
.map(bd -> Decimal.fromBigDecimal(bd, bd.precision(), bd.scale()))
.collect(Collectors.toList());
} else if (castRule instanceof DecimalToNumericPrimitiveCastRule) {
if (outputType.is(DataTypeFamily.INTEGER_NUMERIC)
&& (containsPartialCheck(predicate) || containsNotEqualCheck(predicate))) {
// TODO: scale ?
return null;
} else if (containsEqualCheck(predicate)) {
return null;
}

CastExecutor<Object, Objects> castExecutor =
(CastExecutor<Object, Objects>) castRule.create(inputType, outputType);
return literals.stream().map(castExecutor::cast).collect(Collectors.toList());
} else if (castRule instanceof NumericPrimitiveCastRule) {
if (inputType.is(DataTypeFamily.INTEGER_NUMERIC)
&& outputType.is(DataTypeFamily.INTEGER_NUMERIC)) {
if (integerScaleLargerThan(inputType.getTypeRoot(), outputType.getTypeRoot())) {
return literals;
} else {
// the data may be overflow, don't cast the predicate to avoid complexity
return null;
}
}

// otherwise, the data may be overflow, don't cast the predicate to avoid complexity
return null;
} else if (castRule instanceof NumericToStringCastRule) {
// the data may be overflow, don't cast the predicate to avoid complexity
return null;
} else if (castRule instanceof StringToDecimalCastRule) {
// dangerous
return null;
} else if (castRule instanceof StringToNumericPrimitiveCastRule) {
return null;
}

// TODO comment
CastExecutor<Object, Objects> castExecutor =
(CastExecutor<Object, Objects>) castRule.create(inputType, outputType);
return literals.stream()
.map(l -> castExecutor == null ? l : castExecutor.cast(l))
.collect(Collectors.toList());
}

private static boolean containsEqualCheck(LeafPredicate predicate) {
LeafFunction function = predicate.function();
return function instanceof In
|| function instanceof Equal
|| function instanceof GreaterOrEqual
|| function instanceof LessOrEqual;
}

private static boolean containsPartialCheck(LeafPredicate predicate) {
LeafFunction function = predicate.function();
return function instanceof LessThan
|| function instanceof LessOrEqual
|| function instanceof GreaterThan
|| function instanceof GreaterOrEqual;
}

private static boolean containsNotEqualCheck(LeafPredicate predicate) {
LeafFunction function = predicate.function();
return function instanceof NotIn || function instanceof NotEqual;
}

private static boolean integerScaleLargerThan(DataTypeRoot a, DataTypeRoot b) {
return (a == DataTypeRoot.SMALLINT && b == DataTypeRoot.TINYINT)
|| (a == DataTypeRoot.INTEGER && b != DataTypeRoot.BIGINT)
|| a == DataTypeRoot.BIGINT;
}

// Map<Target family or root, Map<Input family or root, rule>>
private final Map<Object, Map<Object, CastRule<?, ?>>> rules = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry
Predicate dataPredicate =
dataFilterMapping.computeIfAbsent(
entry.file().schemaId(),
id -> simpleStatsEvolutions.convertFilter(entry.file().schemaId(), filter));
id ->
simpleStatsEvolutions.tryEvolveFilter(
entry.file().schemaId(), filter));

try (FileIndexPredicate predicate =
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE
schemaId2DataFilter.computeIfAbsent(
entry.file().schemaId(),
id ->
fieldValueStatsConverters.convertFilter(
fieldValueStatsConverters.tryEvolveFilter(
entry.file().schemaId(), valueFilter));
return predicate.evaluate(dataPredicate).remain();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,17 @@ public static int[][] createDataProjection(
}

/**
* Create predicate list from data fields. We will visit all predicate in filters, reset it's
* field index, name and type, and ignore predicate if the field is not exist.
* Evolve data field filters because of schema evolution. We will visit all predicate in
* filters, reset its field index, name and type, and ignore predicate if the field is not
* exist.
*
* @param tableFields the table fields
* @param dataFields the underlying data fields
* @param filters the filters
* @return the data filters
*/
@Nullable
public static List<Predicate> createDataFilters(
public static List<Predicate> evolveDataFilters(
List<DataField> tableFields, List<DataField> dataFields, List<Predicate> filters) {
if (filters == null) {
return null;
Expand All @@ -308,29 +309,24 @@ public static List<Predicate> createDataFilters(
return Optional.empty();
}

DataType dataValueType = dataField.type().copy(true);
DataType predicateType = predicate.type().copy(true);
CastExecutor<Object, Object> castExecutor =
dataValueType.equals(predicateType)
? null
: (CastExecutor<Object, Object>)
CastExecutors.resolve(
predicate.type(), dataField.type());
// Convert value from predicate type to underlying data type which may lose
// information, for example, convert double value to int. But it doesn't matter
// because it just for predicate push down and the data will be filtered
// correctly after reading.
List<Object> literals =
predicate.literals().stream()
.map(v -> castExecutor == null ? v : castExecutor.cast(v))
.collect(Collectors.toList());
return Optional.of(
new LeafPredicate(
predicate.function(),
dataField.type(),
indexOf(dataField, idToDataFields),
dataField.name(),
literals));
DataType dataValueType = dataField.type();
DataType predicateType = predicate.type();

List<Object> evolvedLiterals =
dataValueType.equalsIgnoreNullable(predicateType)
? predicate.literals()
: CastExecutors.castPredicateLiteralsEvolutionSafely(
predicate, dataValueType);

return evolvedLiterals == null
? Optional.empty()
: Optional.of(
new LeafPredicate(
predicate.function(),
dataField.type(),
indexOf(dataField, idToDataFields),
dataField.name(),
evolvedLiterals));
};

for (Predicate predicate : filters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,18 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) {
});
}

public Predicate convertFilter(long dataSchemaId, Predicate filter) {
return tableSchemaId == dataSchemaId
? filter
: Objects.requireNonNull(
SchemaEvolutionUtil.createDataFilters(
schemaFields.apply(tableSchemaId),
schemaFields.apply(dataSchemaId),
Collections.singletonList(filter)))
.get(0);
@Nullable
public Predicate tryEvolveFilter(long dataSchemaId, Predicate filter) {
if (tableSchemaId == dataSchemaId) {
return filter;
}
List<Predicate> evolved =
Objects.requireNonNull(
SchemaEvolutionUtil.evolveDataFilters(
schemaFields.apply(tableSchemaId),
schemaFields.apply(dataSchemaId),
Collections.singletonList(filter)));
return evolved.isEmpty() ? null : evolved.get(0);
}

public List<DataField> tableDataFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private List<Predicate> readFilters(
List<Predicate> dataFilters =
tableSchema.id() == dataSchema.id()
? filters
: SchemaEvolutionUtil.createDataFilters(
: SchemaEvolutionUtil.evolveDataFilters(
tableSchema.fields(), dataSchema.fields(), filters);

// Skip pushing down partition filters to reader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public void testCreateDataFilters() {
IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList()));

List<Predicate> filters =
SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates);
SchemaEvolutionUtil.evolveDataFilters(tableFields2, dataFields, predicates);
assert filters != null;
assertThat(filters.size()).isEqualTo(1);

Expand All @@ -300,7 +300,7 @@ public void testColumnTypeFilter() {
"c",
Collections.singletonList(1.0D)));
List<Predicate> filters =
SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates);
SchemaEvolutionUtil.evolveDataFilters(tableFields2, dataFields, predicates);
assert filters != null;
assertThat(filters.size()).isEqualTo(1);

Expand Down
4 changes: 4 additions & 0 deletions paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ under the License.
<artifactId>parquet-avro</artifactId>
<groupId>org.apache.parquet</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
Expand Down
Loading

0 comments on commit 5ea5d09

Please sign in to comment.