Skip to content

Commit

Permalink
[core] Fix that sequence fields are mistakenly aggregated by default …
Browse files Browse the repository at this point in the history
…aggregator in AggregateMergeFunction
  • Loading branch information
yuzelin committed Jan 21, 2025
1 parent cfb0075 commit 30e1fc9
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,6 @@ private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
fieldType,
strAggFunc,
ignoreRetract,
isPrimaryKey,
options,
fieldName));
} else if (defaultAggFunc != null && !allSequenceFields.contains(fieldName)) {
Expand All @@ -555,7 +554,6 @@ private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
fieldType,
defaultAggFunc,
ignoreRetract,
isPrimaryKey,
options,
fieldName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
Expand Down Expand Up @@ -109,6 +111,8 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {
private final List<String> tableNames;
private final List<DataType> tableTypes;
private final List<String> primaryKeys;
private final List<String> sequenceFields;
@Nullable private final String defaultAggFunc;

private Factory(
Options conf,
Expand All @@ -119,6 +123,11 @@ private Factory(
this.tableNames = tableNames;
this.tableTypes = tableTypes;
this.primaryKeys = primaryKeys;
this.sequenceFields = options.sequenceField();

String defaultAggFunc = options.fieldsDefaultFunc();
this.defaultAggFunc =
defaultAggFunc == null ? FieldLastNonNullValueAggFactory.NAME : defaultAggFunc;
}

@Override
Expand All @@ -132,27 +141,31 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
}

FieldAggregator[] fieldAggregators = new FieldAggregator[fieldNames.size()];
String defaultAggFunc = options.fieldsDefaultFunc();
for (int i = 0; i < fieldNames.size(); i++) {
String fieldName = fieldNames.get(i);
DataType fieldType = fieldTypes.get(i);
// aggregate by primary keys, so they do not aggregate
boolean isPrimaryKey = primaryKeys.contains(fieldName);
String strAggFunc = options.fieldAggFunc(fieldName);
strAggFunc = strAggFunc == null ? defaultAggFunc : strAggFunc;

String strAggFunc = getAggFunc(fieldName);
boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName);
fieldAggregators[i] =
FieldAggregatorFactory.create(
fieldType,
strAggFunc,
ignoreRetract,
isPrimaryKey,
options,
fieldName);
fieldType, strAggFunc, ignoreRetract, options, fieldName);
}

return new AggregateMergeFunction(createFieldGetters(fieldTypes), fieldAggregators);
}

private String getAggFunc(String fieldName) {
if (primaryKeys.contains(fieldName)) {
// aggregate by primary keys, so they do not aggregate
return FieldPrimaryKeyAggFactory.NAME;
}

if (sequenceFields.contains(fieldName)) {
return FieldLastNonNullValueAggFactory.NAME;
}

String strAggFunc = options.fieldAggFunc(fieldName);
return strAggFunc == null ? defaultAggFunc : strAggFunc;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,8 @@ static FieldAggregator create(
DataType fieldType,
@Nullable String strAgg,
boolean ignoreRetract,
boolean isPrimaryKey,
CoreOptions options,
String field) {
FieldAggregator fieldAggregator;
if (isPrimaryKey) {
strAgg = FieldPrimaryKeyAggFactory.NAME;
} else if (strAgg == null) {
strAgg = FieldLastNonNullValueAggFactory.NAME;
}

FieldAggregatorFactory fieldAggregatorFactory =
FactoryUtil.discoverFactory(
FieldAggregator.class.getClassLoader(),
Expand All @@ -60,12 +52,7 @@ static FieldAggregator create(
strAgg));
}

fieldAggregator = fieldAggregatorFactory.create(fieldType, options, field);

if (ignoreRetract) {
fieldAggregator = new FieldIgnoreRetractAgg(fieldAggregator);
}

return fieldAggregator;
FieldAggregator fieldAggregator = fieldAggregatorFactory.create(fieldType, options, field);
return ignoreRetract ? new FieldIgnoreRetractAgg(fieldAggregator) : fieldAggregator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,6 @@ public void testCustomAgg() throws IOException {
DataTypes.STRING(),
"custom",
false,
false,
CoreOptions.fromMap(new HashMap<>()),
"custom");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,24 @@ public void testMergeRead() {
assertThat(batchSql("SELECT * FROM T where v = 1"))
.containsExactlyInAnyOrder(Row.of(2, 1, 1));
}

@Test
public void testSequenceFieldWithDefaultAgg() {
sql(
"CREATE TABLE seq_default_agg ("
+ " pk INT PRIMARY KEY NOT ENFORCED,"
+ " seq INT,"
+ " v INT) WITH ("
+ " 'merge-engine'='aggregation',"
+ " 'sequence.field'='seq',"
+ " 'fields.default-aggregate-function'='sum'"
+ ")");

sql("INSERT INTO seq_default_agg VALUES (0, 1, 1)");
sql("INSERT INTO seq_default_agg VALUES (0, 2, 2)");

assertThat(sql("SELECT * FROM seq_default_agg")).containsExactly(Row.of(0, 2, 3));
}
}

/** ITCase for {@link FieldNestedUpdateAgg}. */
Expand Down

0 comments on commit 30e1fc9

Please sign in to comment.