diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 357461f74d70..70768f317a53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -526,38 +526,23 @@ private Map> createFieldAggregators( List fieldNames = rowType.getFieldNames(); List fieldTypes = rowType.getFieldTypes(); Map> fieldAggregators = new HashMap<>(); - String defaultAggFunc = options.fieldsDefaultFunc(); for (int i = 0; i < fieldNames.size(); i++) { String fieldName = fieldNames.get(i); DataType fieldType = fieldTypes.get(i); - // aggregate by primary keys, so they do not aggregate - boolean isPrimaryKey = primaryKeys.contains(fieldName); - String strAggFunc = options.fieldAggFunc(fieldName); - boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName); - if (strAggFunc != null) { - fieldAggregators.put( - i, - () -> - FieldAggregatorFactory.create( - fieldType, - strAggFunc, - ignoreRetract, - isPrimaryKey, - options, - fieldName)); - } else if (defaultAggFunc != null && !allSequenceFields.contains(fieldName)) { + if (allSequenceFields.contains(fieldName)) { // no agg for sequence fields + continue; + } + + String aggFuncName = + FieldAggregatorFactory.getAggFuncName(fieldName, primaryKeys, options); + if (aggFuncName != null) { fieldAggregators.put( i, () -> FieldAggregatorFactory.create( - fieldType, - defaultAggFunc, - ignoreRetract, - isPrimaryKey, - options, - fieldName)); + fieldType, fieldName, aggFuncName, options)); } } return fieldAggregators; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java index bad77ba91da5..a27ac99d7219 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java @@ -25,6 +25,7 @@ import org.apache.paimon.mergetree.compact.MergeFunction; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; @@ -132,24 +133,18 @@ public MergeFunction create(@Nullable int[][] projection) { } FieldAggregator[] fieldAggregators = new FieldAggregator[fieldNames.size()]; - String defaultAggFunc = options.fieldsDefaultFunc(); + List sequenceFields = options.sequenceField(); for (int i = 0; i < fieldNames.size(); i++) { String fieldName = fieldNames.get(i); DataType fieldType = fieldTypes.get(i); - // aggregate by primary keys, so they do not aggregate - boolean isPrimaryKey = primaryKeys.contains(fieldName); - String strAggFunc = options.fieldAggFunc(fieldName); - strAggFunc = strAggFunc == null ? defaultAggFunc : strAggFunc; + String aggFuncName = + sequenceFields.contains(fieldName) + ? FieldLastNonNullValueAggFactory.NAME // no agg for sequence fields + : FieldAggregatorFactory.getAggFuncName( + fieldName, primaryKeys, options); - boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName); fieldAggregators[i] = - FieldAggregatorFactory.create( - fieldType, - strAggFunc, - ignoreRetract, - isPrimaryKey, - options, - fieldName); + FieldAggregatorFactory.create(fieldType, fieldName, aggFuncName, options); } return new AggregateMergeFunction(createFieldGetters(fieldTypes), fieldAggregators); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java index d2ce0e4760ad..a5d570dae345 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java @@ -27,6 +27,8 @@ import javax.annotation.Nullable; +import java.util.List; + /** Factory for {@link FieldAggregator}. */ public interface FieldAggregatorFactory extends Factory { @@ -35,37 +37,34 @@ public interface FieldAggregatorFactory extends Factory { String identifier(); static FieldAggregator create( - DataType fieldType, - @Nullable String strAgg, - boolean ignoreRetract, - boolean isPrimaryKey, - CoreOptions options, - String field) { - FieldAggregator fieldAggregator; - if (isPrimaryKey) { - strAgg = FieldPrimaryKeyAggFactory.NAME; - } else if (strAgg == null) { - strAgg = FieldLastNonNullValueAggFactory.NAME; - } - + DataType fieldType, String fieldName, String aggFuncName, CoreOptions options) { FieldAggregatorFactory fieldAggregatorFactory = FactoryUtil.discoverFactory( FieldAggregator.class.getClassLoader(), FieldAggregatorFactory.class, - strAgg); + aggFuncName); if (fieldAggregatorFactory == null) { throw new RuntimeException( String.format( "Use unsupported aggregation: %s or spell aggregate function incorrectly!", - strAgg)); + aggFuncName)); } - fieldAggregator = fieldAggregatorFactory.create(fieldType, options, field); + FieldAggregator fieldAggregator = + fieldAggregatorFactory.create(fieldType, options, fieldName); + return options.fieldAggIgnoreRetract(fieldName) + ? new FieldIgnoreRetractAgg(fieldAggregator) + : fieldAggregator; + } - if (ignoreRetract) { - fieldAggregator = new FieldIgnoreRetractAgg(fieldAggregator); + @Nullable + static String getAggFuncName(String fieldName, List primaryKeys, CoreOptions options) { + if (primaryKeys.contains(fieldName)) { + // aggregate by primary keys, so they do not aggregate + return FieldPrimaryKeyAggFactory.NAME; } - return fieldAggregator; + String aggFunc = options.fieldAggFunc(fieldName); + return aggFunc == null ? options.fieldsDefaultFunc() : aggFunc; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index cf99a1157286..166b4479c83e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -936,10 +936,8 @@ public void testCustomAgg() throws IOException { FieldAggregatorFactory.create( DataTypes.STRING(), "custom", - false, - false, - CoreOptions.fromMap(new HashMap<>()), - "custom"); + "custom", + CoreOptions.fromMap(new HashMap<>())); Object agg = fieldAggregator.agg("test", "test"); assertThat(agg).isEqualTo("test"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java index 954c1455d4cf..7b8ce3904e1f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java @@ -1195,6 +1195,24 @@ public void testMergeRead() { assertThat(batchSql("SELECT * FROM T where v = 1")) .containsExactlyInAnyOrder(Row.of(2, 1, 1)); } + + @Test + public void testSequenceFieldWithDefaultAgg() { + sql( + "CREATE TABLE seq_default_agg (" + + " pk INT PRIMARY KEY NOT ENFORCED," + + " seq INT," + + " v INT) WITH (" + + " 'merge-engine'='aggregation'," + + " 'sequence.field'='seq'," + + " 'fields.default-aggregate-function'='sum'" + + ")"); + + sql("INSERT INTO seq_default_agg VALUES (0, 1, 1)"); + sql("INSERT INTO seq_default_agg VALUES (0, 2, 2)"); + + assertThat(sql("SELECT * FROM seq_default_agg")).containsExactly(Row.of(0, 2, 3)); + } } /** ITCase for {@link FieldNestedUpdateAgg}. */