diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 47d969e616c2..2fbe4a28e512 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -22,7 +22,6 @@ import java.io.File; import java.util.Collections; import java.util.List; -import java.util.Map; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections.CollectionUtils; import org.apache.pinot.segment.local.data.manager.TableDataManager; @@ -60,12 +59,7 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD PartialUpsertHandler partialUpsertHandler = null; if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) { - Map partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); - Preconditions.checkArgument(partialUpsertStrategies != null, - "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); - partialUpsertHandler = - new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(), - comparisonColumns); + partialUpsertHandler = new PartialUpsertHandler(schema, comparisonColumns, upsertConfig); } String deleteRecordColumn = upsertConfig.getDeleteRecordColumn(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 735750ff9d97..e513e3879eb7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -19,7 +19,9 @@ package org.apache.pinot.segment.local.upsert; import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,6 +51,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp // Used to initialize a reference to previous row for merging in partial upsert private final LazyRow _reusePreviousRow = new LazyRow(); + private final Map _reuseMergeResultHolder = new HashMap<>(); @VisibleForTesting final ConcurrentHashMap _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); @@ -340,7 +343,8 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) { int currentDocId = recordLocation.getDocId(); if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) { _reusePreviousRow.init(currentSegment, currentDocId); - _partialUpsertHandler.merge(_reusePreviousRow, record); + _partialUpsertHandler.merge(_reusePreviousRow, record, _reuseMergeResultHolder); + _reuseMergeResultHolder.clear(); } } return recordLocation; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 8fef9c360276..118412ab7725 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -18,90 +18,73 @@ */ package org.apache.pinot.segment.local.upsert; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; +import javax.annotation.Nullable; import org.apache.pinot.segment.local.segment.readers.LazyRow; -import org.apache.pinot.segment.local.upsert.merger.OverwriteMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; /** * Handler for partial-upsert. + * + * This class is responsible for merging the new record with the previous record. + * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column, + * it uses the default merge strategy. + * + * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}. + * If a merger for row is defined then it takes precedence and ignores column mergers. */ public class PartialUpsertHandler { - // _column2Mergers maintains the mapping of merge strategies per columns. - private final Map _column2Mergers = new HashMap<>(); - private final PartialUpsertMerger _defaultPartialUpsertMerger; - private final List _comparisonColumns; private final List _primaryKeyColumns; + private final List _comparisonColumns; + private final TreeMap _fieldSpecMap; + private final PartialUpsertMerger _partialUpsertMerger; - public PartialUpsertHandler(Schema schema, Map partialUpsertStrategies, - UpsertConfig.Strategy defaultPartialUpsertStrategy, List comparisonColumns) { - _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy); - _comparisonColumns = comparisonColumns; + public PartialUpsertHandler(Schema schema, List comparisonColumns, UpsertConfig upsertConfig) { _primaryKeyColumns = schema.getPrimaryKeyColumns(); + _comparisonColumns = comparisonColumns; + _fieldSpecMap = schema.getFieldSpecMap(); + _partialUpsertMerger = + PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig); + } - for (Map.Entry entry : partialUpsertStrategies.entrySet()) { - _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); + public void merge(LazyRow previousRow, GenericRow newRow, Map resultHolder) { + _partialUpsertMerger.merge(previousRow, newRow, resultHolder); + + // iterate over only merger results and update newRecord with merged values + for (Map.Entry entry : resultHolder.entrySet()) { + // skip primary key and comparison columns + String column = entry.getKey(); + if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) { + continue; + } + setMergedValue(newRow, column, entry.getValue()); } - } - /** - * Merges records and returns the merged record. - * We used a map to indicate all configured fields for partial upsert. For these fields - * (1) If the prev value is null, return the new value - * (2) If the prev record is not null, the new value is null, return the prev value. - * (3) If neither values are not null, then merge the value and return. - * For un-configured fields, they are using default override behavior, regardless null values. - * - * For example, overwrite merger will only override the prev value if the new value is not null. - * Null values will override existing values if not configured. They can be ignored by using ignoreMerger. - * - * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for - * re-reads. - * @param newRecord the new consumed record. - */ - public void merge(LazyRow prevRecord, GenericRow newRecord) { - for (String column : prevRecord.getColumnNames()) { - if (!_primaryKeyColumns.contains(column)) { - PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger); - // Non-overwrite mergers - // (1) If the value of the previous is null value, skip merging and use the new value - // (2) Else If the value of new value is null, use the previous value (even for comparison columns). - // (3) Else If the column is not a comparison column, we applied the merged value to it. - if (!(merger instanceof OverwriteMerger)) { - Object prevValue = prevRecord.getValue(column); - if (prevValue != null) { - if (newRecord.isNullValue(column)) { - // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of - // using - // multiple comparison columns. We never apply a merge function to it, rather we just take any/all - // non-null comparison column values from the previous record, and the sole non-null comparison column - // value from the new record. - newRecord.putValue(column, prevValue); - newRecord.removeNullValueField(column); - } else if (!_comparisonColumns.contains(column)) { - newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column))); - } - } - } else { - // Overwrite mergers. - // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value - // (2) Otherwise, if previous is not null, init columnReader and use the previous value. - if (newRecord.isNullValue(column)) { - Object prevValue = prevRecord.getValue(column); - if (prevValue != null) { - newRecord.putValue(column, prevValue); - newRecord.removeNullValueField(column); - } - } - } + // handle comparison columns + for (String column : _comparisonColumns) { + if (newRow.isNullValue(column) && !previousRow.isNullValue(column)) { + newRow.putValue(column, previousRow.getValue(column)); + newRow.removeNullValueField(column); } } } + + private void setMergedValue(GenericRow row, String column, @Nullable Object mergedValue) { + if (mergedValue != null) { + // remove null value field if it was set + row.removeNullValueField(column); + row.putValue(column, mergedValue); + } else { + // if column exists but mapped to a null value then merger result was a null value + row.putDefaultNullValue(column, _fieldSpecMap.get(column).getDefaultNullValue()); + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/BasePartialUpsertMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/BasePartialUpsertMerger.java new file mode 100644 index 000000000000..07923fede2b0 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/BasePartialUpsertMerger.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger; + +import java.util.List; +import org.apache.pinot.spi.config.table.UpsertConfig; + + +public abstract class BasePartialUpsertMerger implements PartialUpsertMerger { + protected final List _primaryKeyColumns; + protected final List _comparisonColumns; + protected final UpsertConfig _upsertConfig; + + protected BasePartialUpsertMerger(List primaryKeyColumns, List comparisonColumns, + UpsertConfig upsertConfig) { + _primaryKeyColumns = primaryKeyColumns; + _comparisonColumns = comparisonColumns; + _upsertConfig = upsertConfig; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java new file mode 100644 index 000000000000..3e510a8bcf9e --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.segment.local.segment.readers.LazyRow; +import org.apache.pinot.segment.local.upsert.merger.columnar.OverwriteMerger; +import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMerger; +import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.readers.GenericRow; + + +/** + * Default Partial upsert merger implementation. + * PartialUpsertColumnarMerger iterates over each column and merges them based on the defined strategy per column in + * table config. + */ +public class PartialUpsertColumnarMerger extends BasePartialUpsertMerger { + private final PartialUpsertColumnMerger _defaultColumnValueMerger; + private final Map _column2Mergers = new HashMap<>(); + + public PartialUpsertColumnarMerger(List primaryKeyColumns, List comparisonColumns, + UpsertConfig upsertConfig) { + super(primaryKeyColumns, comparisonColumns, upsertConfig); + _defaultColumnValueMerger = + PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy()); + Map partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); + Preconditions.checkArgument(partialUpsertStrategies != null, "Partial upsert strategies must be configured"); + for (Map.Entry entry : partialUpsertStrategies.entrySet()) { + _column2Mergers.put(entry.getKey(), PartialUpsertColumnMergerFactory.getMerger(entry.getValue())); + } + } + + /** + * Merges records and returns the merged record. + * We used a map to indicate all configured fields for partial upsert. For these fields + * (1) If the prev value is null, return the new value + * (2) If the prev record is not null, the new value is null, return the prev value. + * (3) If neither values are not null, then merge the value and return. + * For un-configured fields, they are using default override behavior, regardless null values. + * + * For example, overwrite merger will only override the prev value if the new value is not null. + * Null values will override existing values if not configured. They can be ignored by using ignoreMerger. + */ + @Override + public void merge(LazyRow previousRow, GenericRow newRow, Map resultHolder) { + for (String column : previousRow.getColumnNames()) { + // Skip primary key and comparison columns + if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) { + continue; + } + PartialUpsertColumnMerger merger = _column2Mergers.getOrDefault(column, _defaultColumnValueMerger); + // Non-overwrite mergers + // (1) If the value of the previous is null value, skip merging and use the new value + // (2) Else If the value of new value is null, use the previous value (even for comparison columns) + // (3) Else If the column is not a comparison column, we applied the merged value to it + if (!(merger instanceof OverwriteMerger)) { + Object prevValue = previousRow.getValue(column); + if (prevValue != null) { + if (newRow.isNullValue(column)) { + resultHolder.put(column, prevValue); + } else { + resultHolder.put(column, merger.merge(prevValue, newRow.getValue(column))); + } + } + } else { + // Overwrite mergers + // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value + // (2) Otherwise, use previous value if it is not null + if (newRow.isNullValue(column)) { + Object prevValue = previousRow.getValue(column); + if (prevValue != null) { + resultHolder.put(column, prevValue); + } + } + } + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java index 817d9531b3ee..2fdd3c821872 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java @@ -18,13 +18,20 @@ */ package org.apache.pinot.segment.local.upsert.merger; +import java.util.Map; +import org.apache.pinot.segment.local.segment.readers.LazyRow; +import org.apache.pinot.spi.data.readers.GenericRow; + + +/** + * Merger to merge previously persisted row with the new incoming row. + * Custom implementation can be plugged by implementing this interface and add the class name to the upsert config. + */ public interface PartialUpsertMerger { + /** - * Handle partial upsert merge. - * - * @param previousValue the value of given field from the last derived full record during ingestion. - * @param currentValue the value of given field from the new consumed record. - * @return a new value after merge + * Merges previous row with new incoming row and persists the merged results per column in the provided resultHolder. + * Primary key and comparison columns should not be merged because their values are not allowed to be modified. */ - Object merge(Object previousValue, Object currentValue); + void merge(LazyRow previousRow, GenericRow newRow, Map resultHolder); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java index 55e0912c8f5b..5b3b0148026e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.segment.local.upsert.merger; +import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.UpsertConfig; @@ -25,32 +27,23 @@ public class PartialUpsertMergerFactory { private PartialUpsertMergerFactory() { } - private static final AppendMerger APPEND_MERGER = new AppendMerger(); - private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger(); - private static final IgnoreMerger IGNORE_MERGER = new IgnoreMerger(); - private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger(); - private static final MaxMerger MAX_MERGER = new MaxMerger(); - private static final MinMerger MIN_MERGER = new MinMerger(); - private static final UnionMerger UNION_MERGER = new UnionMerger(); - - public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) { - switch (strategy) { - case APPEND: - return APPEND_MERGER; - case INCREMENT: - return INCREMENT_MERGER; - case IGNORE: - return IGNORE_MERGER; - case MAX: - return MAX_MERGER; - case MIN: - return MIN_MERGER; - case OVERWRITE: - return OVERWRITE_MERGER; - case UNION: - return UNION_MERGER; - default: - throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy); + /** + * Returns the default partial upsert merger or a custom implementation from a given class name in the config. + */ + public static PartialUpsertMerger getPartialUpsertMerger(List primaryKeyColumns, + List comparisonColumns, UpsertConfig upsertConfig) { + String customMergerClassName = upsertConfig.getPartialUpsertMergerClass(); + // If a custom implementation is provided in config, initialize an implementation and return. + if (StringUtils.isNotBlank(customMergerClassName)) { + try { + Class partialUpsertMergerClass = Class.forName(customMergerClassName); + return (PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class) + .newInstance(primaryKeyColumns, comparisonColumns, upsertConfig); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to instantiate partial upsert merger with class: %s", customMergerClassName), e); + } } + return new PartialUpsertColumnarMerger(primaryKeyColumns, comparisonColumns, upsertConfig); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/AppendMerger.java similarity index 91% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/AppendMerger.java index 3ed7c9035f70..3666ae890203 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/AppendMerger.java @@ -16,16 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; /** * Merges 2 records and returns the merged record. * Append the new value from incoming row to the existing value from multi-value field. Then return the merged record. * Append merger allows duplicated records in the multi-value field. */ -public class AppendMerger implements PartialUpsertMerger { - AppendMerger() { - } +public class AppendMerger implements PartialUpsertColumnMerger { /** * Append the new value from incoming row to the given multi-value field of previous record. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IgnoreMerger.java similarity index 88% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IgnoreMerger.java index c9bb16fdf032..16cab35fc333 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IgnoreMerger.java @@ -16,15 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; /** * Merges 2 records and returns the merged record. * By default, ignore the new value from incoming row. Then return the merged record. */ -public class IgnoreMerger implements PartialUpsertMerger { - IgnoreMerger() { - } +public class IgnoreMerger implements PartialUpsertColumnMerger { @Override public Object merge(Object previousValue, Object currentValue) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IncrementMerger.java similarity index 91% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IncrementMerger.java index 34b4d0185c0f..e26dda98c3de 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IncrementMerger.java @@ -16,15 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; /** * Merges 2 records and returns the merged record. * Add the new value from incoming row to the existing value from numeric field. Then return the merged record. */ -public class IncrementMerger implements PartialUpsertMerger { - IncrementMerger() { - } +public class IncrementMerger implements PartialUpsertColumnMerger { /** * Increment the new value from incoming row to the given field of previous record. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MaxMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MaxMerger.java similarity index 89% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MaxMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MaxMerger.java index 9ab421b45d62..c21ddaecd937 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MaxMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MaxMerger.java @@ -16,12 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; -public class MaxMerger implements PartialUpsertMerger { - - MaxMerger() { - } +public class MaxMerger implements PartialUpsertColumnMerger { /** * Keep the maximal value for the given field. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MinMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MinMerger.java similarity index 89% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MinMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MinMerger.java index 49f7f2380f5c..aeddafe8dd3f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MinMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MinMerger.java @@ -16,12 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; -public class MinMerger implements PartialUpsertMerger { - - MinMerger() { - } +public class MinMerger implements PartialUpsertColumnMerger { /** * Keep the minimal value for the given field. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/OverwriteMerger.java similarity index 88% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/OverwriteMerger.java index 48795671852e..915a43174199 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/OverwriteMerger.java @@ -16,15 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; /** * Merges 2 records and returns the merged record. * Overwrite the existing value for the given field. Then return the merged record. */ -public class OverwriteMerger implements PartialUpsertMerger { - OverwriteMerger() { - } +public class OverwriteMerger implements PartialUpsertColumnMerger { @Override public Object merge(Object previousValue, Object currentValue) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMerger.java new file mode 100644 index 000000000000..dee7b94b52c2 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMerger.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger.columnar; + +public interface PartialUpsertColumnMerger { + + /** + * Handle partial upsert merge for single column between previous and new row. + * + * @param previousValue the value of given field from the last derived full record during ingestion. + * @param currentValue the value of given field from the new consumed record. + * @return a new value after merge + */ + Object merge(Object previousValue, Object currentValue); +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerFactory.java new file mode 100644 index 000000000000..1cd44c4e6f6b --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerFactory.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger.columnar; + +import org.apache.pinot.spi.config.table.UpsertConfig; + + +public class PartialUpsertColumnMergerFactory { + private PartialUpsertColumnMergerFactory() { + } + + private static final AppendMerger APPEND_MERGER = new AppendMerger(); + private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger(); + private static final IgnoreMerger IGNORE_MERGER = new IgnoreMerger(); + private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger(); + private static final MaxMerger MAX_MERGER = new MaxMerger(); + private static final MinMerger MIN_MERGER = new MinMerger(); + private static final UnionMerger UNION_MERGER = new UnionMerger(); + + public static PartialUpsertColumnMerger getMerger(UpsertConfig.Strategy strategy) { + switch (strategy) { + case APPEND: + return APPEND_MERGER; + case INCREMENT: + return INCREMENT_MERGER; + case IGNORE: + return IGNORE_MERGER; + case MAX: + return MAX_MERGER; + case MIN: + return MIN_MERGER; + case OVERWRITE: + return OVERWRITE_MERGER; + case UNION: + return UNION_MERGER; + default: + throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy); + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/UnionMerger.java similarity index 92% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/UnionMerger.java index 2a2c03033faf..c0b2eeef54b6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/UnionMerger.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; import java.util.Set; import java.util.TreeSet; @@ -27,9 +27,7 @@ * Added the new value from incoming row to the existing value from multi-value field. Then return the merged record. * Union merger will dedup duplicated records in the multi-value field. */ -public class UnionMerger implements PartialUpsertMerger { - UnionMerger() { - } +public class UnionMerger implements PartialUpsertColumnMerger { /** * Union the new value from incoming row to the given multi-value field of previous record. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index a713c81492a9..d03e5ab9cfc0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -921,32 +921,41 @@ static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema sche UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); assert upsertConfig != null; Map partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); + String partialUpsertMergerClass = upsertConfig.getPartialUpsertMergerClass(); - List primaryKeyColumns = schema.getPrimaryKeyColumns(); - for (Map.Entry entry : partialUpsertStrategies.entrySet()) { - String column = entry.getKey(); - UpsertConfig.Strategy columnStrategy = entry.getValue(); - Preconditions.checkState(!primaryKeyColumns.contains(column), "Merger cannot be applied to primary key columns"); - - if (upsertConfig.getComparisonColumns() != null) { - Preconditions.checkState(!upsertConfig.getComparisonColumns().contains(column), - "Merger cannot be applied to comparison column"); - } else { - Preconditions.checkState(!tableConfig.getValidationConfig().getTimeColumnName().equals(column), - "Merger cannot be applied to time column"); - } + // check if partialUpsertMergerClass is provided then partialUpsertStrategies should be empty + if (StringUtils.isNotBlank(partialUpsertMergerClass)) { + Preconditions.checkState(MapUtils.isEmpty(partialUpsertStrategies), + "If partialUpsertMergerClass is provided then partialUpsertStrategies should be empty"); + } else { + List primaryKeyColumns = schema.getPrimaryKeyColumns(); + // validate partial upsert column mergers + for (Map.Entry entry : partialUpsertStrategies.entrySet()) { + String column = entry.getKey(); + UpsertConfig.Strategy columnStrategy = entry.getValue(); + Preconditions.checkState(!primaryKeyColumns.contains(column), + "Merger cannot be applied to primary key columns"); + + if (upsertConfig.getComparisonColumns() != null) { + Preconditions.checkState(!upsertConfig.getComparisonColumns().contains(column), + "Merger cannot be applied to comparison column"); + } else { + Preconditions.checkState(!tableConfig.getValidationConfig().getTimeColumnName().equals(column), + "Merger cannot be applied to time column"); + } - FieldSpec fieldSpec = schema.getFieldSpecFor(column); - Preconditions.checkState(fieldSpec != null, "Merger cannot be applied to non-existing column: %s", column); - - if (columnStrategy == UpsertConfig.Strategy.INCREMENT) { - Preconditions.checkState(fieldSpec.getDataType().getStoredType().isNumeric(), - "INCREMENT merger cannot be applied to non-numeric column: %s", column); - Preconditions.checkState(!schema.getDateTimeNames().contains(column), - "INCREMENT merger cannot be applied to date time column: %s", column); - } else if (columnStrategy == UpsertConfig.Strategy.APPEND || columnStrategy == UpsertConfig.Strategy.UNION) { - Preconditions.checkState(!fieldSpec.isSingleValueField(), - "%s merger cannot be applied to single-value column: %s", columnStrategy.toString(), column); + FieldSpec fieldSpec = schema.getFieldSpecFor(column); + Preconditions.checkState(fieldSpec != null, "Merger cannot be applied to non-existing column: %s", column); + + if (columnStrategy == UpsertConfig.Strategy.INCREMENT) { + Preconditions.checkState(fieldSpec.getDataType().getStoredType().isNumeric(), + "INCREMENT merger cannot be applied to non-numeric column: %s", column); + Preconditions.checkState(!schema.getDateTimeNames().contains(column), + "INCREMENT merger cannot be applied to date time column: %s", column); + } else if (columnStrategy == UpsertConfig.Strategy.APPEND || columnStrategy == UpsertConfig.Strategy.UNION) { + Preconditions.checkState(!fieldSpec.isSingleValueField(), + "%s merger cannot be applied to single-value column: %s", columnStrategy.toString(), column); + } } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index f66aa2a67963..4b954aa1400e 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.upsert; +import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -25,18 +26,18 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.segment.readers.LazyRow; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; +import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; +import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; import org.mockito.internal.util.collections.Sets; import org.testng.annotations.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockConstruction; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; @@ -67,6 +68,33 @@ public void testComparisonColumn() { testMerge(false, 2, false, 8, "hoursSinceEpoch", 8, false); } + @Test + public void testCustomPartialUpsertMergerWithNonNullResult() { + GenericRow newRecord = initGenericRow(new GenericRow(), + ImmutableMap.of("pk", "pk1", "field1", 3L, "field2", "inc", "hoursSinceEpoch", 2L)); + LazyRow prevRecord = mock(LazyRow.class); + mockLazyRow(prevRecord, ImmutableMap.of("pk", "pk1", "field1", 5L, "field2", "set", "hoursSinceEpoch", 2L)); + GenericRow expectedRecord = initGenericRow(new GenericRow(), + ImmutableMap.of("pk", "pk1", "field1", 8L, "field2", "inc", "hoursSinceEpoch", 2L)); + + testCustomMerge(prevRecord, newRecord, expectedRecord, getCustomMerger()); + } + + @Test + public void testCustomPartialUpsertMergerWithNullResult() { + Map newRowData = new HashMap(Map.of("pk", "pk1", "field1", 3L, "field2", "reset")); + newRowData.put("hoursSinceEpoch", null); // testing null comparison column + GenericRow newRecord = initGenericRow(new GenericRow(), newRowData); + LazyRow prevRecord = mock(LazyRow.class); + mockLazyRow(prevRecord, Map.of("pk", "pk1", "field1", 5L, "field2", "set", "hoursSinceEpoch", 2L)); + Map expectedData = new HashMap<>(Map.of("pk", "pk1", "field2", "reset", "hoursSinceEpoch", 2L)); + expectedData.put("field1", Long.MIN_VALUE); + GenericRow expectedRecord = initGenericRow(new GenericRow(), expectedData); + expectedRecord.addNullValueField("field1"); + + testCustomMerge(prevRecord, newRecord, expectedRecord, getCustomMerger()); + } + public void testMerge(boolean isPreviousNull, Object previousValue, boolean isNewNull, Object newValue, String columnName, Object expectedValue, boolean isExpectedNull) { Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) @@ -81,9 +109,11 @@ public void testMerge(boolean isPreviousNull, Object previousValue, boolean isNe when(mockReader.isNull(1)).thenReturn(isPreviousNull); when(mockReader.getValue(1)).thenReturn(previousValue); })) { + UpsertConfig upsertConfig = new UpsertConfig(); + upsertConfig.setPartialUpsertStrategies(partialUpsertStrategies); + upsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.IGNORE); PartialUpsertHandler handler = - spy(new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.IGNORE, - Collections.singletonList("hoursSinceEpoch"))); + spy(new PartialUpsertHandler(schema, Collections.singletonList("hoursSinceEpoch"), upsertConfig)); ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class); when(segment.getColumnNames()).thenReturn(Sets.newSet("field1", "field2", "hoursSinceEpoch")); @@ -96,9 +126,72 @@ public void testMerge(boolean isPreviousNull, Object previousValue, boolean isNe } else { row.putValue(columnName, newValue); } - handler.merge(prevRecord, row); + handler.merge(prevRecord, row, new HashMap<>()); assertEquals(row.getValue(columnName), expectedValue); assertEquals(row.isNullValue(columnName), isExpectedNull); } } + + private void testCustomMerge(LazyRow prevRecord, GenericRow newRecord, GenericRow expectedRecord, + PartialUpsertMerger customMerger) { + + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) + .addSingleValueDimension("field1", FieldSpec.DataType.LONG) + .addSingleValueDimension("field2", FieldSpec.DataType.STRING) + .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS") + .setPrimaryKeyColumns(Arrays.asList("pk")).build(); + + UpsertConfig upsertConfig = new UpsertConfig(); + upsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE); + upsertConfig.setPartialUpsertMergerClass("org.apache.pinot.segment.local.upsert.CustomPartialUpsertRowMerger"); + + try (MockedStatic partialUpsertMergerFactory = mockStatic( + PartialUpsertMergerFactory.class)) { + when(PartialUpsertMergerFactory.getPartialUpsertMerger(Arrays.asList("pk"), Arrays.asList("hoursSinceEpoch"), + upsertConfig)).thenReturn(customMerger); + PartialUpsertHandler handler = + new PartialUpsertHandler(schema, Collections.singletonList("hoursSinceEpoch"), upsertConfig); + HashMap reuseMergerResult = new HashMap<>(); + handler.merge(prevRecord, newRecord, reuseMergerResult); + assertEquals(newRecord, expectedRecord); + } + } + + public PartialUpsertMerger getCustomMerger() { + return (previousRow, newRow, resultHolder) -> { + if ((newRow.getValue("field2")).equals("set")) { + // use default merger (overwrite) + return; + } + if ((newRow.getValue("field2")).equals("inc")) { + resultHolder.put("field1", (Long) previousRow.getValue("field1") + (Long) newRow.getValue("field1")); + return; + } + if ((newRow.getValue("field2")).equals("reset")) { + resultHolder.put("field1", null); + } + }; + } + + private LazyRow mockLazyRow(LazyRow prevRecord, Map values) { + reset(prevRecord); + when(prevRecord.getColumnNames()).thenReturn(values.keySet()); + for (Map.Entry entry : values.entrySet()) { + when(prevRecord.getValue(entry.getKey())).thenReturn(entry.getValue()); + } + return prevRecord; + } + + private GenericRow initGenericRow(GenericRow genericRow, Map values) { + genericRow.clear(); + for (Map.Entry entry : values.entrySet()) { + String field = entry.getKey(); + Object value = entry.getValue(); + genericRow.putValue(field, value); + if (value == null) { + genericRow.addNullValueField(field); + } + } + return genericRow; + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactoryTest.java new file mode 100644 index 000000000000..72cb75ad47c5 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactoryTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class PartialUpsertMergerFactoryTest { + + @Test + public void testGetPartialUpsertMerger() { + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) + .addSingleValueDimension("field1", FieldSpec.DataType.LONG).addMetric("field2", FieldSpec.DataType.LONG) + .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS") + .setPrimaryKeyColumns(Arrays.asList("pk")).build(); + + UpsertConfig upsertConfig = new UpsertConfig(); + Map partialUpsertStrategies = new HashMap<>(); + partialUpsertStrategies.put("field1", UpsertConfig.Strategy.OVERWRITE); + upsertConfig.setPartialUpsertStrategies(partialUpsertStrategies); + upsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.IGNORE); + upsertConfig.setPartialUpsertMergerClass( + "org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger"); + + PartialUpsertMerger partialUpsertMerger = + PartialUpsertMergerFactory.getPartialUpsertMerger(schema.getPrimaryKeyColumns(), + Collections.singletonList("hoursSinceEpoch"), upsertConfig); + + assertNotNull(partialUpsertMerger); + assertTrue(partialUpsertMerger instanceof PartialUpsertColumnarMerger); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerTest.java similarity index 65% rename from pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java rename to pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerTest.java index ae75c81c4c07..dae5ae46de08 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerTest.java @@ -16,18 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; +import org.apache.pinot.spi.config.table.UpsertConfig; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; -public class PartialUpsertMergerTest { +public class PartialUpsertColumnMergerTest { @Test public void testAppendMergers() { - AppendMerger appendMerger = new AppendMerger(); + AppendMerger appendMerger = (AppendMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.APPEND); Integer[] array1 = {1, 2, 3}; Integer[] array2 = {3, 4, 6}; @@ -37,21 +38,22 @@ public void testAppendMergers() { @Test public void testIncrementMergers() { - IncrementMerger incrementMerger = new IncrementMerger(); + IncrementMerger incrementMerger = + (IncrementMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.INCREMENT); assertEquals(3, incrementMerger.merge(1, 2)); } @Test public void testIgnoreMergers() { - IgnoreMerger ignoreMerger = new IgnoreMerger(); + IgnoreMerger ignoreMerger = (IgnoreMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.IGNORE); assertEquals(null, ignoreMerger.merge(null, 3)); assertEquals(3, ignoreMerger.merge(3, null)); } @Test public void testMaxMinMergers() { - MaxMerger maxMerger = new MaxMerger(); - MinMerger minMerger = new MinMerger(); + MaxMerger maxMerger = (MaxMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.MAX); + MinMerger minMerger = (MinMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.MIN); assertEquals(1, maxMerger.merge(0, 1)); assertEquals(0, minMerger.merge(0, 1)); assertEquals(1, maxMerger.merge(1, 0)); @@ -60,13 +62,14 @@ public void testMaxMinMergers() { @Test public void testOverwriteMergers() { - OverwriteMerger overwriteMerger = new OverwriteMerger(); + OverwriteMerger overwriteMerger = + (OverwriteMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.OVERWRITE); assertEquals("newValue", overwriteMerger.merge("oldValue", "newValue")); } @Test public void testUnionMergers() { - UnionMerger unionMerger = new UnionMerger(); + UnionMerger unionMerger = (UnionMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.UNION); String[] array1 = {"a", "b", "c"}; String[] array2 = {"c", "d", "e"}; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index 906c6ceedfed..848c99199963 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -51,6 +51,9 @@ public enum Strategy { @JsonPropertyDescription("default upsert strategy for partial mode") private Strategy _defaultPartialUpsertStrategy = Strategy.OVERWRITE; + @JsonPropertyDescription("Class name for custom row merger implementation") + private String _partialUpsertMergerClass; + @JsonPropertyDescription("Columns for upsert comparison, default to time column") private List _comparisonColumns; @@ -110,6 +113,10 @@ public Strategy getDefaultPartialUpsertStrategy() { return _defaultPartialUpsertStrategy; } + public String getPartialUpsertMergerClass() { + return _partialUpsertMergerClass; + } + public List getComparisonColumns() { return _comparisonColumns; } @@ -175,6 +182,14 @@ public void setDefaultPartialUpsertStrategy(Strategy defaultPartialUpsertStrateg _defaultPartialUpsertStrategy = defaultPartialUpsertStrategy; } + /** + * Specify to plug a custom implementation for merging rows in partial upsert realtime table. + * @param partialUpsertMergerClass + */ + public void setPartialUpsertMergerClass(String partialUpsertMergerClass) { + _partialUpsertMergerClass = partialUpsertMergerClass; + } + /** * By default, Pinot uses the value in the time column to determine the latest record. For two records with the * same primary key, the record with the larger value of the time column is picked as the