Skip to content

Commit

Permalink
[core] Fix that sequence fields are mistakenly aggregated by default …
Browse files Browse the repository at this point in the history
…aggregator in AggregateMergeFunction
  • Loading branch information
yuzelin committed Jan 21, 2025
1 parent cfb0075 commit a6cdb1c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -526,38 +526,23 @@ private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
List<String> fieldNames = rowType.getFieldNames();
List<DataType> fieldTypes = rowType.getFieldTypes();
Map<Integer, Supplier<FieldAggregator>> fieldAggregators = new HashMap<>();
String defaultAggFunc = options.fieldsDefaultFunc();
for (int i = 0; i < fieldNames.size(); i++) {
String fieldName = fieldNames.get(i);
DataType fieldType = fieldTypes.get(i);
// aggregate by primary keys, so they do not aggregate
boolean isPrimaryKey = primaryKeys.contains(fieldName);
String strAggFunc = options.fieldAggFunc(fieldName);
boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName);

if (strAggFunc != null) {
fieldAggregators.put(
i,
() ->
FieldAggregatorFactory.create(
fieldType,
strAggFunc,
ignoreRetract,
isPrimaryKey,
options,
fieldName));
} else if (defaultAggFunc != null && !allSequenceFields.contains(fieldName)) {
if (allSequenceFields.contains(fieldName)) {
// no agg for sequence fields
continue;
}

String aggFuncName =
FieldAggregatorFactory.getAggFuncName(fieldName, primaryKeys, options);
if (aggFuncName != null) {
fieldAggregators.put(
i,
() ->
FieldAggregatorFactory.create(
fieldType,
defaultAggFunc,
ignoreRetract,
isPrimaryKey,
options,
fieldName));
fieldType, fieldName, aggFuncName, options));
}
}
return fieldAggregators;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
Expand Down Expand Up @@ -132,24 +133,18 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
}

FieldAggregator[] fieldAggregators = new FieldAggregator[fieldNames.size()];
String defaultAggFunc = options.fieldsDefaultFunc();
List<String> sequenceFields = options.sequenceField();
for (int i = 0; i < fieldNames.size(); i++) {
String fieldName = fieldNames.get(i);
DataType fieldType = fieldTypes.get(i);
// aggregate by primary keys, so they do not aggregate
boolean isPrimaryKey = primaryKeys.contains(fieldName);
String strAggFunc = options.fieldAggFunc(fieldName);
strAggFunc = strAggFunc == null ? defaultAggFunc : strAggFunc;
String aggFuncName =
sequenceFields.contains(fieldName)
? FieldLastNonNullValueAggFactory.NAME // no agg for sequence fields
: FieldAggregatorFactory.getAggFuncName(
fieldName, primaryKeys, options);

boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName);
fieldAggregators[i] =
FieldAggregatorFactory.create(
fieldType,
strAggFunc,
ignoreRetract,
isPrimaryKey,
options,
fieldName);
FieldAggregatorFactory.create(fieldType, fieldName, aggFuncName, options);
}

return new AggregateMergeFunction(createFieldGetters(fieldTypes), fieldAggregators);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import javax.annotation.Nullable;

import java.util.List;

/** Factory for {@link FieldAggregator}. */
public interface FieldAggregatorFactory extends Factory {

Expand All @@ -35,37 +37,34 @@ public interface FieldAggregatorFactory extends Factory {
String identifier();

static FieldAggregator create(
DataType fieldType,
@Nullable String strAgg,
boolean ignoreRetract,
boolean isPrimaryKey,
CoreOptions options,
String field) {
FieldAggregator fieldAggregator;
if (isPrimaryKey) {
strAgg = FieldPrimaryKeyAggFactory.NAME;
} else if (strAgg == null) {
strAgg = FieldLastNonNullValueAggFactory.NAME;
}

DataType fieldType, String fieldName, String aggFuncName, CoreOptions options) {
FieldAggregatorFactory fieldAggregatorFactory =
FactoryUtil.discoverFactory(
FieldAggregator.class.getClassLoader(),
FieldAggregatorFactory.class,
strAgg);
aggFuncName);
if (fieldAggregatorFactory == null) {
throw new RuntimeException(
String.format(
"Use unsupported aggregation: %s or spell aggregate function incorrectly!",
strAgg));
aggFuncName));
}

fieldAggregator = fieldAggregatorFactory.create(fieldType, options, field);
FieldAggregator fieldAggregator =
fieldAggregatorFactory.create(fieldType, options, fieldName);
return options.fieldAggIgnoreRetract(fieldName)
? new FieldIgnoreRetractAgg(fieldAggregator)
: fieldAggregator;
}

if (ignoreRetract) {
fieldAggregator = new FieldIgnoreRetractAgg(fieldAggregator);
@Nullable
static String getAggFuncName(String fieldName, List<String> primaryKeys, CoreOptions options) {
if (primaryKeys.contains(fieldName)) {
// aggregate by primary keys, so they do not aggregate
return FieldPrimaryKeyAggFactory.NAME;
}

return fieldAggregator;
String aggFunc = options.fieldAggFunc(fieldName);
return aggFunc == null ? options.fieldsDefaultFunc() : aggFunc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -936,10 +936,8 @@ public void testCustomAgg() throws IOException {
FieldAggregatorFactory.create(
DataTypes.STRING(),
"custom",
false,
false,
CoreOptions.fromMap(new HashMap<>()),
"custom");
"custom",
CoreOptions.fromMap(new HashMap<>()));

Object agg = fieldAggregator.agg("test", "test");
assertThat(agg).isEqualTo("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,24 @@ public void testMergeRead() {
assertThat(batchSql("SELECT * FROM T where v = 1"))
.containsExactlyInAnyOrder(Row.of(2, 1, 1));
}

@Test
public void testSequenceFieldWithDefaultAgg() {
sql(
"CREATE TABLE seq_default_agg ("
+ " pk INT PRIMARY KEY NOT ENFORCED,"
+ " seq INT,"
+ " v INT) WITH ("
+ " 'merge-engine'='aggregation',"
+ " 'sequence.field'='seq',"
+ " 'fields.default-aggregate-function'='sum'"
+ ")");

sql("INSERT INTO seq_default_agg VALUES (0, 1, 1)");
sql("INSERT INTO seq_default_agg VALUES (0, 2, 2)");

assertThat(sql("SELECT * FROM seq_default_agg")).containsExactly(Row.of(0, 2, 3));
}
}

/** ITCase for {@link FieldNestedUpdateAgg}. */
Expand Down

0 comments on commit a6cdb1c

Please sign in to comment.