From bb7189362b842d05291ada0001b14166076bacb2 Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Wed, 8 Nov 2023 12:16:46 +0530 Subject: [PATCH 1/9] refactor existing mergers to represent column mergers --- .../local/upsert/PartialUpsertHandler.java | 16 +++++++------- .../merger/{ => columnar}/AppendMerger.java | 6 ++++-- .../merger/{ => columnar}/IgnoreMerger.java | 6 ++++-- .../{ => columnar}/IncrementMerger.java | 6 ++++-- .../merger/{ => columnar}/MaxMerger.java | 6 ++++-- .../merger/{ => columnar}/MinMerger.java | 6 ++++-- .../{ => columnar}/OverwriteMerger.java | 6 ++++-- .../PartialUpsertColumnMerger.java} | 6 +++--- .../PartialUpsertColumnMergerFactory.java} | 8 +++---- .../merger/{ => columnar}/UnionMerger.java | 4 ++-- .../PartialUpsertColumnMergerTest.java} | 21 +++++++++++-------- 11 files changed, 53 insertions(+), 38 deletions(-) rename pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/{ => columnar}/AppendMerger.java (92%) rename pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/{ => columnar}/IgnoreMerger.java (89%) rename pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/{ => columnar}/IncrementMerger.java (92%) rename pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/{ => columnar}/MaxMerger.java (89%) rename pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/{ => columnar}/MinMerger.java (89%) rename pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/{ => columnar}/OverwriteMerger.java (89%) rename pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/{PartialUpsertMerger.java => columnar/PartialUpsertColumnMerger.java} (85%) rename pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/{PartialUpsertMergerFactory.java => columnar/PartialUpsertColumnMergerFactory.java} (88%) rename pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/{ => columnar}/UnionMerger.java (92%) rename pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/{PartialUpsertMergerTest.java => columnar/PartialUpsertColumnMergerTest.java} (65%) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 8fef9c360276..4d8b3868bbaa 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -22,9 +22,9 @@ import java.util.List; import java.util.Map; import org.apache.pinot.segment.local.segment.readers.LazyRow; -import org.apache.pinot.segment.local.upsert.merger.OverwriteMerger; -import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; -import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; +import org.apache.pinot.segment.local.upsert.merger.columnar.OverwriteMerger; +import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMerger; +import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -35,19 +35,19 @@ */ public class PartialUpsertHandler { // _column2Mergers maintains the mapping of merge strategies per columns. - private final Map _column2Mergers = new HashMap<>(); - private final PartialUpsertMerger _defaultPartialUpsertMerger; + private final Map _column2Mergers = new HashMap<>(); + private final PartialUpsertColumnMerger _defaultPartialUpsertMerger; private final List _comparisonColumns; private final List _primaryKeyColumns; public PartialUpsertHandler(Schema schema, Map partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy, List comparisonColumns) { - _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy); + _defaultPartialUpsertMerger = PartialUpsertColumnMergerFactory.getMerger(defaultPartialUpsertStrategy); _comparisonColumns = comparisonColumns; _primaryKeyColumns = schema.getPrimaryKeyColumns(); for (Map.Entry entry : partialUpsertStrategies.entrySet()) { - _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); + _column2Mergers.put(entry.getKey(), PartialUpsertColumnMergerFactory.getMerger(entry.getValue())); } } @@ -69,7 +69,7 @@ public PartialUpsertHandler(Schema schema, Map pa public void merge(LazyRow prevRecord, GenericRow newRecord) { for (String column : prevRecord.getColumnNames()) { if (!_primaryKeyColumns.contains(column)) { - PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger); + PartialUpsertColumnMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger); // Non-overwrite mergers // (1) If the value of the previous is null value, skip merging and use the new value // (2) Else If the value of new value is null, use the previous value (even for comparison columns). diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/AppendMerger.java similarity index 92% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/AppendMerger.java index 3ed7c9035f70..0f6064a37573 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/AppendMerger.java @@ -16,14 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; + + /** * Merges 2 records and returns the merged record. * Append the new value from incoming row to the existing value from multi-value field. Then return the merged record. * Append merger allows duplicated records in the multi-value field. */ -public class AppendMerger implements PartialUpsertMerger { +public class AppendMerger implements PartialUpsertColumnMerger { AppendMerger() { } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IgnoreMerger.java similarity index 89% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IgnoreMerger.java index c9bb16fdf032..8684def6c984 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IgnoreMerger.java @@ -16,13 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; + + /** * Merges 2 records and returns the merged record. * By default, ignore the new value from incoming row. Then return the merged record. */ -public class IgnoreMerger implements PartialUpsertMerger { +public class IgnoreMerger implements PartialUpsertColumnMerger { IgnoreMerger() { } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IncrementMerger.java similarity index 92% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IncrementMerger.java index 34b4d0185c0f..9f00ed70b446 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IncrementMerger.java @@ -16,13 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; + + /** * Merges 2 records and returns the merged record. * Add the new value from incoming row to the existing value from numeric field. Then return the merged record. */ -public class IncrementMerger implements PartialUpsertMerger { +public class IncrementMerger implements PartialUpsertColumnMerger { IncrementMerger() { } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MaxMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MaxMerger.java similarity index 89% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MaxMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MaxMerger.java index 9ab421b45d62..6385363cb3ad 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MaxMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MaxMerger.java @@ -16,9 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; -public class MaxMerger implements PartialUpsertMerger { + + +public class MaxMerger implements PartialUpsertColumnMerger { MaxMerger() { } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MinMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MinMerger.java similarity index 89% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MinMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MinMerger.java index 49f7f2380f5c..22543a943e76 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MinMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MinMerger.java @@ -16,9 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; -public class MinMerger implements PartialUpsertMerger { + + +public class MinMerger implements PartialUpsertColumnMerger { MinMerger() { } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/OverwriteMerger.java similarity index 89% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/OverwriteMerger.java index 48795671852e..f47d4cebfd1c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/OverwriteMerger.java @@ -16,13 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; + + /** * Merges 2 records and returns the merged record. * Overwrite the existing value for the given field. Then return the merged record. */ -public class OverwriteMerger implements PartialUpsertMerger { +public class OverwriteMerger implements PartialUpsertColumnMerger { OverwriteMerger() { } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMerger.java similarity index 85% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMerger.java index 817d9531b3ee..edf43a65e3ba 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMerger.java @@ -16,11 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; -public interface PartialUpsertMerger { +public interface PartialUpsertColumnMerger { /** - * Handle partial upsert merge. + * Handle partial upsert merge for single column between previous and new row. * * @param previousValue the value of given field from the last derived full record during ingestion. * @param currentValue the value of given field from the new consumed record. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerFactory.java similarity index 88% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerFactory.java index 55e0912c8f5b..1cd44c4e6f6b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerFactory.java @@ -16,13 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; import org.apache.pinot.spi.config.table.UpsertConfig; -public class PartialUpsertMergerFactory { - private PartialUpsertMergerFactory() { +public class PartialUpsertColumnMergerFactory { + private PartialUpsertColumnMergerFactory() { } private static final AppendMerger APPEND_MERGER = new AppendMerger(); @@ -33,7 +33,7 @@ private PartialUpsertMergerFactory() { private static final MinMerger MIN_MERGER = new MinMerger(); private static final UnionMerger UNION_MERGER = new UnionMerger(); - public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) { + public static PartialUpsertColumnMerger getMerger(UpsertConfig.Strategy strategy) { switch (strategy) { case APPEND: return APPEND_MERGER; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/UnionMerger.java similarity index 92% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/UnionMerger.java index 2a2c03033faf..0c2067f69d15 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/UnionMerger.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; import java.util.Set; import java.util.TreeSet; @@ -27,7 +27,7 @@ * Added the new value from incoming row to the existing value from multi-value field. Then return the merged record. * Union merger will dedup duplicated records in the multi-value field. */ -public class UnionMerger implements PartialUpsertMerger { +public class UnionMerger implements PartialUpsertColumnMerger { UnionMerger() { } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerTest.java similarity index 65% rename from pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java rename to pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerTest.java index ae75c81c4c07..dae5ae46de08 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMergerTest.java @@ -16,18 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.upsert.merger; +package org.apache.pinot.segment.local.upsert.merger.columnar; +import org.apache.pinot.spi.config.table.UpsertConfig; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; -public class PartialUpsertMergerTest { +public class PartialUpsertColumnMergerTest { @Test public void testAppendMergers() { - AppendMerger appendMerger = new AppendMerger(); + AppendMerger appendMerger = (AppendMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.APPEND); Integer[] array1 = {1, 2, 3}; Integer[] array2 = {3, 4, 6}; @@ -37,21 +38,22 @@ public void testAppendMergers() { @Test public void testIncrementMergers() { - IncrementMerger incrementMerger = new IncrementMerger(); + IncrementMerger incrementMerger = + (IncrementMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.INCREMENT); assertEquals(3, incrementMerger.merge(1, 2)); } @Test public void testIgnoreMergers() { - IgnoreMerger ignoreMerger = new IgnoreMerger(); + IgnoreMerger ignoreMerger = (IgnoreMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.IGNORE); assertEquals(null, ignoreMerger.merge(null, 3)); assertEquals(3, ignoreMerger.merge(3, null)); } @Test public void testMaxMinMergers() { - MaxMerger maxMerger = new MaxMerger(); - MinMerger minMerger = new MinMerger(); + MaxMerger maxMerger = (MaxMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.MAX); + MinMerger minMerger = (MinMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.MIN); assertEquals(1, maxMerger.merge(0, 1)); assertEquals(0, minMerger.merge(0, 1)); assertEquals(1, maxMerger.merge(1, 0)); @@ -60,13 +62,14 @@ public void testMaxMinMergers() { @Test public void testOverwriteMergers() { - OverwriteMerger overwriteMerger = new OverwriteMerger(); + OverwriteMerger overwriteMerger = + (OverwriteMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.OVERWRITE); assertEquals("newValue", overwriteMerger.merge("oldValue", "newValue")); } @Test public void testUnionMergers() { - UnionMerger unionMerger = new UnionMerger(); + UnionMerger unionMerger = (UnionMerger) PartialUpsertColumnMergerFactory.getMerger(UpsertConfig.Strategy.UNION); String[] array1 = {"a", "b", "c"}; String[] array2 = {"c", "d", "e"}; From aae941bceac4e1edd7045ac6e7ba3765661c49d4 Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Thu, 9 Nov 2023 21:49:10 +0530 Subject: [PATCH 2/9] pluggable custom partial upsert merger --- .../BaseTableUpsertMetadataManager.java | 3 +- ...rentMapPartitionUpsertMetadataManager.java | 5 +- .../local/upsert/PartialUpsertHandler.java | 102 ++++++++--------- .../merger/BasePartialUpsertMerger.java | 37 +++++++ .../merger/PartialUpsertColumnarMerger.java | 103 ++++++++++++++++++ .../upsert/merger/PartialUpsertMerger.java | 45 ++++++++ .../merger/PartialUpsertMergerFactory.java | 66 +++++++++++ .../upsert/PartialUpsertHandlerTest.java | 75 +++++++++++-- .../PartialUpsertMergerFactoryTest.java | 57 ++++++++++ .../pinot/spi/config/table/UpsertConfig.java | 15 +++ 10 files changed, 444 insertions(+), 64 deletions(-) create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/BasePartialUpsertMerger.java create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java create mode 100644 pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactoryTest.java diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 47d969e616c2..fd2778350ea1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -64,8 +64,7 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD Preconditions.checkArgument(partialUpsertStrategies != null, "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); partialUpsertHandler = - new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(), - comparisonColumns); + new PartialUpsertHandler(schema, comparisonColumns, upsertConfig); } String deleteRecordColumn = upsertConfig.getDeleteRecordColumn(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 735750ff9d97..eff287c57065 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -19,6 +19,8 @@ package org.apache.pinot.segment.local.upsert; import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.util.HashMap; import java.util.Iterator; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -49,6 +51,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp // Used to initialize a reference to previous row for merging in partial upsert private final LazyRow _reusePreviousRow = new LazyRow(); + private final HashMap _reuseMergerResult = new HashMap<>(); @VisibleForTesting final ConcurrentHashMap _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); @@ -340,7 +343,7 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) { int currentDocId = recordLocation.getDocId(); if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) { _reusePreviousRow.init(currentSegment, currentDocId); - _partialUpsertHandler.merge(_reusePreviousRow, record); + _partialUpsertHandler.merge(_reusePreviousRow, record, _reuseMergerResult); } } return recordLocation; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 4d8b3868bbaa..03bd090a80a0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -18,11 +18,13 @@ */ package org.apache.pinot.segment.local.upsert; +import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.segment.local.segment.readers.LazyRow; -import org.apache.pinot.segment.local.upsert.merger.columnar.OverwriteMerger; +import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; +import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMerger; import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory; import org.apache.pinot.spi.config.table.UpsertConfig; @@ -39,67 +41,59 @@ public class PartialUpsertHandler { private final PartialUpsertColumnMerger _defaultPartialUpsertMerger; private final List _comparisonColumns; private final List _primaryKeyColumns; + private final PartialUpsertMerger _partialUpsertMerger; - public PartialUpsertHandler(Schema schema, Map partialUpsertStrategies, - UpsertConfig.Strategy defaultPartialUpsertStrategy, List comparisonColumns) { - _defaultPartialUpsertMerger = PartialUpsertColumnMergerFactory.getMerger(defaultPartialUpsertStrategy); + public PartialUpsertHandler(Schema schema, List comparisonColumns, UpsertConfig upsertConfig) { + _defaultPartialUpsertMerger = + PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy()); _comparisonColumns = comparisonColumns; _primaryKeyColumns = schema.getPrimaryKeyColumns(); - for (Map.Entry entry : partialUpsertStrategies.entrySet()) { - _column2Mergers.put(entry.getKey(), PartialUpsertColumnMergerFactory.getMerger(entry.getValue())); - } + _partialUpsertMerger = + PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig); + } + + @VisibleForTesting + public PartialUpsertHandler(Schema schema, List comparisonColumns, UpsertConfig upsertConfig, + PartialUpsertMerger partialUpsertMerger) { + _defaultPartialUpsertMerger = + PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy()); + _comparisonColumns = comparisonColumns; + _primaryKeyColumns = schema.getPrimaryKeyColumns(); + + _partialUpsertMerger = partialUpsertMerger; } - /** - * Merges records and returns the merged record. - * We used a map to indicate all configured fields for partial upsert. For these fields - * (1) If the prev value is null, return the new value - * (2) If the prev record is not null, the new value is null, return the prev value. - * (3) If neither values are not null, then merge the value and return. - * For un-configured fields, they are using default override behavior, regardless null values. - * - * For example, overwrite merger will only override the prev value if the new value is not null. - * Null values will override existing values if not configured. They can be ignored by using ignoreMerger. - * - * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for - * re-reads. - * @param newRecord the new consumed record. - */ - public void merge(LazyRow prevRecord, GenericRow newRecord) { + public void merge(LazyRow prevRecord, GenericRow newRecord, Map reuseMergerResult) { + reuseMergerResult.clear(); + + // merger current row with previously indexed row + _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult); + for (String column : prevRecord.getColumnNames()) { - if (!_primaryKeyColumns.contains(column)) { - PartialUpsertColumnMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger); - // Non-overwrite mergers - // (1) If the value of the previous is null value, skip merging and use the new value - // (2) Else If the value of new value is null, use the previous value (even for comparison columns). - // (3) Else If the column is not a comparison column, we applied the merged value to it. - if (!(merger instanceof OverwriteMerger)) { - Object prevValue = prevRecord.getValue(column); - if (prevValue != null) { - if (newRecord.isNullValue(column)) { - // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of - // using - // multiple comparison columns. We never apply a merge function to it, rather we just take any/all - // non-null comparison column values from the previous record, and the sole non-null comparison column - // value from the new record. - newRecord.putValue(column, prevValue); - newRecord.removeNullValueField(column); - } else if (!_comparisonColumns.contains(column)) { - newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column))); - } - } + // no merger to apply on primary key columns + if (_primaryKeyColumns.contains(column)) { + continue; + } + // no merger to apply on comparison key column, use previous row's value if current is null + if (_comparisonColumns.contains(column)) { + if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) { + newRecord.putValue(column, prevRecord.getValue(column)); + newRecord.removeNullValueField(column); + } + continue; + } + + // use merged column value from result map + if (reuseMergerResult.containsKey(column)) { + Object mergedValue = reuseMergerResult.get(column); + if (mergedValue != null) { + // remove null value field if it was set + newRecord.removeNullValueField(column); + newRecord.putValue(column, mergedValue); } else { - // Overwrite mergers. - // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value - // (2) Otherwise, if previous is not null, init columnReader and use the previous value. - if (newRecord.isNullValue(column)) { - Object prevValue = prevRecord.getValue(column); - if (prevValue != null) { - newRecord.putValue(column, prevValue); - newRecord.removeNullValueField(column); - } - } + // if column exists but mapped to a null value then merger result was null + newRecord.addNullValueField(column); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/BasePartialUpsertMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/BasePartialUpsertMerger.java new file mode 100644 index 000000000000..319bb220ec4c --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/BasePartialUpsertMerger.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger; + +import java.util.List; +import org.apache.pinot.spi.config.table.UpsertConfig; + + +public abstract class BasePartialUpsertMerger implements PartialUpsertMerger { + + protected final List _primaryKeyColumns; + protected final List _comparisonColumns; + protected final UpsertConfig _upsertConfig; + + public BasePartialUpsertMerger(List primaryKeyColumns, List comparisonColumns, + UpsertConfig upsertConfig) { + _primaryKeyColumns = primaryKeyColumns; + _comparisonColumns = comparisonColumns; + _upsertConfig = upsertConfig; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java new file mode 100644 index 000000000000..8a4d9ba10553 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.segment.local.segment.readers.LazyRow; +import org.apache.pinot.segment.local.upsert.merger.columnar.OverwriteMerger; +import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMerger; +import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.readers.GenericRow; + + +/** + * Default Partial upsert merger implementation unless custom implementation is not specified in the UpsertConfig + * PartialUpsertColumnarMerger iterates over each column and merges them based on the defined strategy per column in + * table config. + */ +public class PartialUpsertColumnarMerger extends BasePartialUpsertMerger { + + private final PartialUpsertColumnMerger _defaultColumnValueMerger; + private final Map _column2Mergers = new HashMap<>(); + + public PartialUpsertColumnarMerger(List primaryKeyColumns, List comparisonColumns, + UpsertConfig upsertConfig) { + super(primaryKeyColumns, comparisonColumns, upsertConfig); + _defaultColumnValueMerger = + PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy()); + Map partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); + for (Map.Entry entry : partialUpsertStrategies.entrySet()) { + _column2Mergers.put(entry.getKey(), PartialUpsertColumnMergerFactory.getMerger(entry.getValue())); + } + } + + /** + * Merges records and returns the merged record. + * We used a map to indicate all configured fields for partial upsert. For these fields + * (1) If the prev value is null, return the new value + * (2) If the prev record is not null, the new value is null, return the prev value. + * (3) If neither values are not null, then merge the value and return. + * For un-configured fields, they are using default override behavior, regardless null values. + * + * For example, overwrite merger will only override the prev value if the new value is not null. + * Null values will override existing values if not configured. They can be ignored by using ignoreMerger. + * + * @param previousRow the value of given field from the last derived full record during ingestion. + * @param currentRow the value of given field from the new consumed record. + * @param mergerResult merge the values from previous and current row and return as a map + */ + @Override + public void merge(LazyRow previousRow, GenericRow currentRow, Map mergerResult) { + for (String column : previousRow.getColumnNames()) { + // skip any mergers on these columns. PartialUpsertHandler ensures this too. + if (!_primaryKeyColumns.contains(column) && !_comparisonColumns.contains(column)) { + PartialUpsertColumnMerger merger = _column2Mergers.getOrDefault(column, _defaultColumnValueMerger); + /** + * Non-overwrite mergers + * (1) If the value of the previous is null value, skip merging and use the new value + * (2) Else If the value of new value is null, use the previous value (even for comparison columns). + * (3) Else If the column is not a comparison column, we applied the merged value to it. + */ + if (!(merger instanceof OverwriteMerger)) { + Object prevValue = previousRow.getValue(column); + if (prevValue != null) { + if (currentRow.isNullValue(column)) { + mergerResult.put(column, prevValue); + } else { + mergerResult.put(column, merger.merge(prevValue, currentRow.getValue(column))); + } + } + } else { + // Overwrite mergers. + // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value + // (2) Otherwise, use previous value if it is not null + if (currentRow.isNullValue(column)) { + Object prevValue = previousRow.getValue(column); + if (prevValue != null) { + mergerResult.put(column, prevValue); + } + } + } + } + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java new file mode 100644 index 000000000000..9f33fbdc010d --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger; + +import java.util.Map; +import org.apache.pinot.segment.local.segment.readers.LazyRow; +import org.apache.pinot.spi.data.readers.GenericRow; + + +/** + * Merger previously persisted row with the new incoming row. + *

+ * Implement this interface to define logic to merge rows. {@link LazyRow} provides abstraction row like abstraction + * to read previously persisted row by lazily loading column values if needed. For automatic plugging of the + * interface via {@link org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory} + * implement {@link BasePartialUpsertMerger} + */ +public interface PartialUpsertMerger { + + /** + * Merge previous row with new incoming row and persist the merged results per column in the provided + * mergerResult map. {@link org.apache.pinot.segment.local.upsert.PartialUpsertHandler} ensures the primary key and + * comparison columns are not modified, comparison columns are merged and only the latest non values are stored. + * @param prevRecord + * @param newRecord + * @param mergerResult + */ + public void merge(LazyRow prevRecord, GenericRow newRecord, Map mergerResult); +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java new file mode 100644 index 000000000000..72009d994267 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.config.table.UpsertConfig; + + +public class PartialUpsertMergerFactory { + + private PartialUpsertMergerFactory() { + } + + /** + * Initialise the default partial upsert merger or initialise a custom implementation from a given class name in + * config + * @param primaryKeyColumns + * @param comparisonColumns + * @param upsertConfig + * @return + */ + public static PartialUpsertMerger getPartialUpsertMerger(List primaryKeyColumns, + List comparisonColumns, UpsertConfig upsertConfig) { + PartialUpsertMerger partialUpsertMerger = null; + String customImplClassName = upsertConfig.getRowMergerCustomImplementation(); + // If a custom implementation is provided in config, initialize an implementation and return. + if (StringUtils.isNotBlank(customImplClassName)) { + try { + Class partialUpsertMergerClass = Class.forName(customImplClassName); + if (!BasePartialUpsertMerger.class.isAssignableFrom(partialUpsertMergerClass)) { + throw new RuntimeException("Implementation class is not an implementation of PartialUpsertMerger.class"); + } + partialUpsertMerger = + (PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class) + .newInstance(primaryKeyColumns, comparisonColumns, upsertConfig); + } catch (ClassNotFoundException + | NoSuchMethodException | InstantiationException | IllegalAccessException + | InvocationTargetException e) { + throw new RuntimeException( + String.format("Could not load partial upsert implementation class by name %s", customImplClassName), e); + } + } else { + // return default implementation + partialUpsertMerger = new PartialUpsertColumnarMerger(primaryKeyColumns, comparisonColumns, upsertConfig); + } + return partialUpsertMerger; + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index f66aa2a67963..04b77be054d2 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.upsert; +import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -25,6 +26,8 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.segment.readers.LazyRow; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; +import org.apache.pinot.segment.local.upsert.merger.BasePartialUpsertMerger; +import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; @@ -33,10 +36,7 @@ import org.mockito.internal.util.collections.Sets; import org.testng.annotations.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockConstruction; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; @@ -81,9 +81,11 @@ public void testMerge(boolean isPreviousNull, Object previousValue, boolean isNe when(mockReader.isNull(1)).thenReturn(isPreviousNull); when(mockReader.getValue(1)).thenReturn(previousValue); })) { + UpsertConfig upsertConfig = new UpsertConfig(); + upsertConfig.setPartialUpsertStrategies(partialUpsertStrategies); + upsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.IGNORE); PartialUpsertHandler handler = - spy(new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.IGNORE, - Collections.singletonList("hoursSinceEpoch"))); + spy(new PartialUpsertHandler(schema, Collections.singletonList("hoursSinceEpoch"), upsertConfig)); ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class); when(segment.getColumnNames()).thenReturn(Sets.newSet("field1", "field2", "hoursSinceEpoch")); @@ -96,9 +98,68 @@ public void testMerge(boolean isPreviousNull, Object previousValue, boolean isNe } else { row.putValue(columnName, newValue); } - handler.merge(prevRecord, row); + handler.merge(prevRecord, row, new HashMap<>()); assertEquals(row.getValue(columnName), expectedValue); assertEquals(row.isNullValue(columnName), isExpectedNull); } } + + @Test + public void testPartialUpsertHandlerWithCustomMerger() { + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) + .addSingleValueDimension("field1", FieldSpec.DataType.STRING).addMetric("field2", FieldSpec.DataType.LONG) + .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS") + .setPrimaryKeyColumns(Arrays.asList("pk")).build(); + UpsertConfig upsertConfig = new UpsertConfig(); + PartialUpsertMerger customMerger = + new BasePartialUpsertMerger(schema.getPrimaryKeyColumns(), Collections.singletonList("hoursSinceEpoch"), + upsertConfig) { + + @Override + public void merge(LazyRow prevRecord, GenericRow newRecord, Map mergerResult) { + String prevField1 = (String) prevRecord.getValue("field1"); + Long prevField2 = (Long) prevRecord.getValue("field2"); + if (prevField1 != null && "end".equalsIgnoreCase(prevField1)) { + mergerResult.put("field2", newRecord.getValue("field2")); + } else { + mergerResult.put("field2", prevField2 + ((Long) newRecord.getValue("field2"))); + } + } + }; + + PartialUpsertHandler handler = + spy(new PartialUpsertHandler(schema, Collections.singletonList("hoursSinceEpoch"), upsertConfig, customMerger)); + + GenericRow expectedRecord = new GenericRow(); + GenericRow newRecord = new GenericRow(); + LazyRow prevRecord = mock(LazyRow.class); + HashMap reuseMergerResult = new HashMap<>(); + + initGenericRow(newRecord, ImmutableMap.of("pk", "pk1", "field1", "running", "field2", 5L)); + mockLazyRow(prevRecord, ImmutableMap.of("pk", "pk1", "field1", "start", "field2", 5L)); + initGenericRow(expectedRecord, ImmutableMap.of("pk", "pk1", "field1", "running", "field2", 10L)); + handler.merge(prevRecord, newRecord, reuseMergerResult); + assertEquals(expectedRecord, newRecord); + + initGenericRow(newRecord, ImmutableMap.of("pk", "pk1", "field1", "running", "field2", 3L)); + mockLazyRow(prevRecord, ImmutableMap.of("pk", "pk1", "field1", "end", "field2", 5L)); + initGenericRow(expectedRecord, ImmutableMap.of("pk", "pk1", "field1", "running", "field2", 3L)); + handler.merge(prevRecord, newRecord, reuseMergerResult); + assertEquals(expectedRecord, newRecord); + } + + private void mockLazyRow(LazyRow prevRecord, Map values) { + reset(prevRecord); + when(prevRecord.getColumnNames()).thenReturn(values.keySet()); + for (Map.Entry entry : values.entrySet()) { + when(prevRecord.getValue(entry.getKey())).thenReturn(entry.getValue()); + } + } + + private void initGenericRow(GenericRow genericRow, Map values) { + genericRow.clear(); + for (Map.Entry entry: values.entrySet()) { + genericRow.putValue(entry.getKey(), entry.getValue()); + } + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactoryTest.java new file mode 100644 index 000000000000..661af515bb54 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactoryTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert.merger; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class PartialUpsertMergerFactoryTest { + + @Test + public void testGetPartialUpsertMerger() { + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) + .addSingleValueDimension("field1", FieldSpec.DataType.LONG).addMetric("field2", FieldSpec.DataType.LONG) + .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS") + .setPrimaryKeyColumns(Arrays.asList("pk")).build(); + + UpsertConfig upsertConfig = new UpsertConfig(); + Map partialUpsertStrategies = new HashMap<>(); + partialUpsertStrategies.put("field1", UpsertConfig.Strategy.OVERWRITE); + upsertConfig.setPartialUpsertStrategies(partialUpsertStrategies); + upsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.IGNORE); + upsertConfig.setRowMergerCustomImplementation( + "org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger"); + + PartialUpsertMerger partialUpsertMerger = + PartialUpsertMergerFactory.getPartialUpsertMerger(schema.getPrimaryKeyColumns(), + Collections.singletonList("hoursSinceEpoch"), upsertConfig); + + assertNotNull(partialUpsertMerger); + assertTrue(partialUpsertMerger instanceof PartialUpsertColumnarMerger); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index 906c6ceedfed..75905c5767f9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -51,6 +51,9 @@ public enum Strategy { @JsonPropertyDescription("default upsert strategy for partial mode") private Strategy _defaultPartialUpsertStrategy = Strategy.OVERWRITE; + @JsonPropertyDescription("Class name for custom row merger implementation") + private String _rowMergerCustomImplementation; + @JsonPropertyDescription("Columns for upsert comparison, default to time column") private List _comparisonColumns; @@ -110,6 +113,10 @@ public Strategy getDefaultPartialUpsertStrategy() { return _defaultPartialUpsertStrategy; } + public String getRowMergerCustomImplementation() { + return _rowMergerCustomImplementation; + } + public List getComparisonColumns() { return _comparisonColumns; } @@ -175,6 +182,14 @@ public void setDefaultPartialUpsertStrategy(Strategy defaultPartialUpsertStrateg _defaultPartialUpsertStrategy = defaultPartialUpsertStrategy; } + /** + * Specify to plug a custom implementation for merging rows in partial upsert realtime table. + * @param rowMergerCustomImplementation + */ + public void setRowMergerCustomImplementation(String rowMergerCustomImplementation) { + _rowMergerCustomImplementation = rowMergerCustomImplementation; + } + /** * By default, Pinot uses the value in the time column to determine the latest record. For two records with the * same primary key, the record with the larger value of the time column is picked as the From aa8fd9d9715978edaa250a9f96c4a1dcb308d8ca Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Sun, 10 Mar 2024 01:03:26 +0530 Subject: [PATCH 3/9] specify default column merger for custom row merger --- ...rentMapPartitionUpsertMetadataManager.java | 1 - .../local/upsert/PartialUpsertHandler.java | 31 +++-- .../merger/PartialUpsertColumnarMerger.java | 2 +- .../merger/PartialUpsertMergerFactory.java | 13 +- .../upsert/PartialUpsertHandlerTest.java | 120 +++++++++++------- 5 files changed, 100 insertions(+), 67 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index eff287c57065..6915f95667e0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -19,7 +19,6 @@ package org.apache.pinot.segment.local.upsert; import com.google.common.annotations.VisibleForTesting; -import java.io.File; import java.util.HashMap; import java.util.Iterator; import java.util.Objects; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 03bd090a80a0..3bf3a3b156cd 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -18,11 +18,10 @@ */ package org.apache.pinot.segment.local.upsert; -import com.google.common.annotations.VisibleForTesting; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.segment.local.segment.readers.LazyRow; +import org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMerger; @@ -34,10 +33,15 @@ /** * Handler for partial-upsert. + * + * This class is responsible for merging the new record with the previous record. + * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column, + * it uses the default merge strategy. + * + * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}. + * If a merger for row is defined then it takes precedence and ignores column mergers. */ public class PartialUpsertHandler { - // _column2Mergers maintains the mapping of merge strategies per columns. - private final Map _column2Mergers = new HashMap<>(); private final PartialUpsertColumnMerger _defaultPartialUpsertMerger; private final List _comparisonColumns; private final List _primaryKeyColumns; @@ -53,17 +57,6 @@ public PartialUpsertHandler(Schema schema, List comparisonColumns, Upser PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig); } - @VisibleForTesting - public PartialUpsertHandler(Schema schema, List comparisonColumns, UpsertConfig upsertConfig, - PartialUpsertMerger partialUpsertMerger) { - _defaultPartialUpsertMerger = - PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy()); - _comparisonColumns = comparisonColumns; - _primaryKeyColumns = schema.getPrimaryKeyColumns(); - - _partialUpsertMerger = partialUpsertMerger; - } - public void merge(LazyRow prevRecord, GenericRow newRecord, Map reuseMergerResult) { reuseMergerResult.clear(); @@ -92,9 +85,15 @@ public void merge(LazyRow prevRecord, GenericRow newRecord, Map newRecord.removeNullValueField(column); newRecord.putValue(column, mergedValue); } else { - // if column exists but mapped to a null value then merger result was null + // if column exists but mapped to a null value then merger result was a null value newRecord.addNullValueField(column); + newRecord.putValue(column, null); } + } else if (!(_partialUpsertMerger instanceof PartialUpsertColumnarMerger)) { + // PartialUpsertColumnMerger already handles default merger but for any custom implementations + // non merged columns need to be applied with default merger + newRecord.putValue(column, + _defaultPartialUpsertMerger.merge(prevRecord.getValue(column), newRecord.getValue(column))); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java index 8a4d9ba10553..40aa854129a8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java @@ -30,7 +30,7 @@ /** - * Default Partial upsert merger implementation unless custom implementation is not specified in the UpsertConfig + * Default Partial upsert merger implementation. * PartialUpsertColumnarMerger iterates over each column and merges them based on the defined strategy per column in * table config. */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java index 72009d994267..96e60cc0d0e0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java @@ -39,14 +39,15 @@ private PartialUpsertMergerFactory() { */ public static PartialUpsertMerger getPartialUpsertMerger(List primaryKeyColumns, List comparisonColumns, UpsertConfig upsertConfig) { - PartialUpsertMerger partialUpsertMerger = null; - String customImplClassName = upsertConfig.getRowMergerCustomImplementation(); + PartialUpsertMerger partialUpsertMerger; + String customRowMerger = upsertConfig.getRowMergerCustomImplementation(); // If a custom implementation is provided in config, initialize an implementation and return. - if (StringUtils.isNotBlank(customImplClassName)) { + if (StringUtils.isNotBlank(customRowMerger)) { try { - Class partialUpsertMergerClass = Class.forName(customImplClassName); + Class partialUpsertMergerClass = Class.forName(customRowMerger); if (!BasePartialUpsertMerger.class.isAssignableFrom(partialUpsertMergerClass)) { - throw new RuntimeException("Implementation class is not an implementation of PartialUpsertMerger.class"); + throw new RuntimeException( + "Provided rowMergerCustomImplementation is not an implementation of BasePartialUpsertMerger.class"); } partialUpsertMerger = (PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class) @@ -55,7 +56,7 @@ public static PartialUpsertMerger getPartialUpsertMerger(List primaryKey | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { throw new RuntimeException( - String.format("Could not load partial upsert implementation class by name %s", customImplClassName), e); + String.format("Could not load partial upsert implementation class by name %s", customRowMerger), e); } } else { // return default implementation diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index 04b77be054d2..0a0b0b6a495c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -26,13 +26,14 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.segment.readers.LazyRow; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; -import org.apache.pinot.segment.local.upsert.merger.BasePartialUpsertMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; +import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; import org.mockito.internal.util.collections.Sets; import org.testng.annotations.Test; @@ -67,6 +68,32 @@ public void testComparisonColumn() { testMerge(false, 2, false, 8, "hoursSinceEpoch", 8, false); } + @Test + public void testCustomPartialUpsertMergerWithNonNullResult() { + GenericRow newRecord = initGenericRow(new GenericRow(), + ImmutableMap.of("pk", "pk1", "field1", 3L, "field2", "inc", "hoursSinceEpoch", 2L)); + LazyRow prevRecord = mock(LazyRow.class); + mockLazyRow(prevRecord, ImmutableMap.of("pk", "pk1", "field1", 5L, "field2", "set", "hoursSinceEpoch", 2L)); + GenericRow expectedRecord = initGenericRow(new GenericRow(), + ImmutableMap.of("pk", "pk1", "field1", 8L, "field2", "inc", "hoursSinceEpoch", 2L)); + + testCustomMerge(prevRecord, newRecord, expectedRecord, getCustomMerger()); + } + + @Test + public void testCustomPartialUpsertMergerWithNullResult() { + Map newRowData = new HashMap(Map.of("pk", "pk1", "field1", 3L, "field2", "reset")); + newRowData.put("hoursSinceEpoch", null); // testing null comparison column + GenericRow newRecord = initGenericRow(new GenericRow(), newRowData); + LazyRow prevRecord = mock(LazyRow.class); + mockLazyRow(prevRecord, Map.of("pk", "pk1", "field1", 5L, "field2", "set", "hoursSinceEpoch", 2L)); + Map expectedData = new HashMap<>(Map.of("pk", "pk1", "field2", "reset", "hoursSinceEpoch", 2L)); + expectedData.put("field1", null); + GenericRow expectedRecord = initGenericRow(new GenericRow(), expectedData); + + testCustomMerge(prevRecord, newRecord, expectedRecord, getCustomMerger()); + } + public void testMerge(boolean isPreviousNull, Object previousValue, boolean isNewNull, Object newValue, String columnName, Object expectedValue, boolean isExpectedNull) { Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) @@ -104,62 +131,69 @@ public void testMerge(boolean isPreviousNull, Object previousValue, boolean isNe } } - @Test - public void testPartialUpsertHandlerWithCustomMerger() { + private void testCustomMerge(LazyRow prevRecord, GenericRow newRecord, GenericRow expectedRecord, + PartialUpsertMerger customMerger) { + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) - .addSingleValueDimension("field1", FieldSpec.DataType.STRING).addMetric("field2", FieldSpec.DataType.LONG) + .addSingleValueDimension("field1", FieldSpec.DataType.LONG) + .addSingleValueDimension("field2", FieldSpec.DataType.STRING) .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS") .setPrimaryKeyColumns(Arrays.asList("pk")).build(); + UpsertConfig upsertConfig = new UpsertConfig(); - PartialUpsertMerger customMerger = - new BasePartialUpsertMerger(schema.getPrimaryKeyColumns(), Collections.singletonList("hoursSinceEpoch"), - upsertConfig) { - - @Override - public void merge(LazyRow prevRecord, GenericRow newRecord, Map mergerResult) { - String prevField1 = (String) prevRecord.getValue("field1"); - Long prevField2 = (Long) prevRecord.getValue("field2"); - if (prevField1 != null && "end".equalsIgnoreCase(prevField1)) { - mergerResult.put("field2", newRecord.getValue("field2")); - } else { - mergerResult.put("field2", prevField2 + ((Long) newRecord.getValue("field2"))); - } - } - }; - - PartialUpsertHandler handler = - spy(new PartialUpsertHandler(schema, Collections.singletonList("hoursSinceEpoch"), upsertConfig, customMerger)); - - GenericRow expectedRecord = new GenericRow(); - GenericRow newRecord = new GenericRow(); - LazyRow prevRecord = mock(LazyRow.class); - HashMap reuseMergerResult = new HashMap<>(); - - initGenericRow(newRecord, ImmutableMap.of("pk", "pk1", "field1", "running", "field2", 5L)); - mockLazyRow(prevRecord, ImmutableMap.of("pk", "pk1", "field1", "start", "field2", 5L)); - initGenericRow(expectedRecord, ImmutableMap.of("pk", "pk1", "field1", "running", "field2", 10L)); - handler.merge(prevRecord, newRecord, reuseMergerResult); - assertEquals(expectedRecord, newRecord); - - initGenericRow(newRecord, ImmutableMap.of("pk", "pk1", "field1", "running", "field2", 3L)); - mockLazyRow(prevRecord, ImmutableMap.of("pk", "pk1", "field1", "end", "field2", 5L)); - initGenericRow(expectedRecord, ImmutableMap.of("pk", "pk1", "field1", "running", "field2", 3L)); - handler.merge(prevRecord, newRecord, reuseMergerResult); - assertEquals(expectedRecord, newRecord); + upsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE); + upsertConfig.setRowMergerCustomImplementation("org.apache.pinot.segment.local.upsert.CustomPartialUpsertRowMerger"); + + try (MockedStatic partialUpsertMergerFactory = mockStatic( + PartialUpsertMergerFactory.class)) { + when(PartialUpsertMergerFactory.getPartialUpsertMerger(Arrays.asList("pk"), Arrays.asList("hoursSinceEpoch"), + upsertConfig)).thenReturn(customMerger); + PartialUpsertHandler handler = + new PartialUpsertHandler(schema, Collections.singletonList("hoursSinceEpoch"), upsertConfig); + HashMap reuseMergerResult = new HashMap<>(); + handler.merge(prevRecord, newRecord, reuseMergerResult); + assertEquals(newRecord, expectedRecord); + } } - private void mockLazyRow(LazyRow prevRecord, Map values) { + public PartialUpsertMerger getCustomMerger() { + return new PartialUpsertMerger() { + @Override + public void merge(LazyRow prevRecord, GenericRow newRecord, Map mergerResult) { + if ((newRecord.getValue("field2")).equals("set")) { + // use default merger (overwrite) + return; + } + if ((newRecord.getValue("field2")).equals("inc")) { + mergerResult.put("field1", (Long) prevRecord.getValue("field1") + (Long) newRecord.getValue("field1")); + return; + } + if ((newRecord.getValue("field2")).equals("reset")) { + mergerResult.put("field1", null); + } + } + }; + } + + private LazyRow mockLazyRow(LazyRow prevRecord, Map values) { reset(prevRecord); when(prevRecord.getColumnNames()).thenReturn(values.keySet()); for (Map.Entry entry : values.entrySet()) { when(prevRecord.getValue(entry.getKey())).thenReturn(entry.getValue()); } + return prevRecord; } - private void initGenericRow(GenericRow genericRow, Map values) { + private GenericRow initGenericRow(GenericRow genericRow, Map values) { genericRow.clear(); - for (Map.Entry entry: values.entrySet()) { - genericRow.putValue(entry.getKey(), entry.getValue()); + for (Map.Entry entry : values.entrySet()) { + String field = entry.getKey(); + Object value = entry.getValue(); + genericRow.putValue(field, value); + if (value == null) { + genericRow.addNullValueField(field); + } } + return genericRow; } } From 982f78134cd69246893f3901427242cba0b63778 Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Fri, 15 Mar 2024 00:52:00 +0530 Subject: [PATCH 4/9] custom merger support in table config --- .../segment/local/utils/TableConfigUtils.java | 54 +++++++++++-------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index a713c81492a9..6b5e305a9e88 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -921,32 +921,40 @@ static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema sche UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); assert upsertConfig != null; Map partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); + String rowMergerCustomImplementation = upsertConfig.getRowMergerCustomImplementation(); + + Preconditions.checkState(StringUtils.isNotBlank(rowMergerCustomImplementation) + || MapUtils.isNotEmpty(partialUpsertStrategies), + "At least one of rowMergerCustomImplementation or partialUpsertStrategies must be provided for partial upsert table"); List primaryKeyColumns = schema.getPrimaryKeyColumns(); - for (Map.Entry entry : partialUpsertStrategies.entrySet()) { - String column = entry.getKey(); - UpsertConfig.Strategy columnStrategy = entry.getValue(); - Preconditions.checkState(!primaryKeyColumns.contains(column), "Merger cannot be applied to primary key columns"); - - if (upsertConfig.getComparisonColumns() != null) { - Preconditions.checkState(!upsertConfig.getComparisonColumns().contains(column), - "Merger cannot be applied to comparison column"); - } else { - Preconditions.checkState(!tableConfig.getValidationConfig().getTimeColumnName().equals(column), - "Merger cannot be applied to time column"); - } + // skip the partial upsert strategies check if rowMergerCustomImplementation is provided + if (StringUtils.isBlank(rowMergerCustomImplementation)) { + for (Map.Entry entry : partialUpsertStrategies.entrySet()) { + String column = entry.getKey(); + UpsertConfig.Strategy columnStrategy = entry.getValue(); + Preconditions.checkState(!primaryKeyColumns.contains(column), "Merger cannot be applied to primary key columns"); + + if (upsertConfig.getComparisonColumns() != null) { + Preconditions.checkState(!upsertConfig.getComparisonColumns().contains(column), + "Merger cannot be applied to comparison column"); + } else { + Preconditions.checkState(!tableConfig.getValidationConfig().getTimeColumnName().equals(column), + "Merger cannot be applied to time column"); + } - FieldSpec fieldSpec = schema.getFieldSpecFor(column); - Preconditions.checkState(fieldSpec != null, "Merger cannot be applied to non-existing column: %s", column); - - if (columnStrategy == UpsertConfig.Strategy.INCREMENT) { - Preconditions.checkState(fieldSpec.getDataType().getStoredType().isNumeric(), - "INCREMENT merger cannot be applied to non-numeric column: %s", column); - Preconditions.checkState(!schema.getDateTimeNames().contains(column), - "INCREMENT merger cannot be applied to date time column: %s", column); - } else if (columnStrategy == UpsertConfig.Strategy.APPEND || columnStrategy == UpsertConfig.Strategy.UNION) { - Preconditions.checkState(!fieldSpec.isSingleValueField(), - "%s merger cannot be applied to single-value column: %s", columnStrategy.toString(), column); + FieldSpec fieldSpec = schema.getFieldSpecFor(column); + Preconditions.checkState(fieldSpec != null, "Merger cannot be applied to non-existing column: %s", column); + + if (columnStrategy == UpsertConfig.Strategy.INCREMENT) { + Preconditions.checkState(fieldSpec.getDataType().getStoredType().isNumeric(), + "INCREMENT merger cannot be applied to non-numeric column: %s", column); + Preconditions.checkState(!schema.getDateTimeNames().contains(column), + "INCREMENT merger cannot be applied to date time column: %s", column); + } else if (columnStrategy == UpsertConfig.Strategy.APPEND || columnStrategy == UpsertConfig.Strategy.UNION) { + Preconditions.checkState(!fieldSpec.isSingleValueField(), + "%s merger cannot be applied to single-value column: %s", columnStrategy.toString(), column); + } } } } From 3a3e827ea5c084d27cc7db5cdba43540e80a7e03 Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Mon, 18 Mar 2024 12:56:54 +0530 Subject: [PATCH 5/9] address review comments --- .../BaseTableUpsertMetadataManager.java | 5 +- .../local/upsert/PartialUpsertHandler.java | 73 +++++++++++-------- .../upsert/merger/PartialUpsertMerger.java | 2 +- .../merger/PartialUpsertMergerFactory.java | 4 +- .../segment/local/utils/TableConfigUtils.java | 17 +++-- .../upsert/PartialUpsertHandlerTest.java | 2 +- .../PartialUpsertMergerFactoryTest.java | 2 +- .../pinot/spi/config/table/UpsertConfig.java | 12 +-- 8 files changed, 66 insertions(+), 51 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index fd2778350ea1..f96f93bcd904 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -25,6 +25,7 @@ import java.util.Map; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; @@ -61,7 +62,9 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD PartialUpsertHandler partialUpsertHandler = null; if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) { Map partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); - Preconditions.checkArgument(partialUpsertStrategies != null, + String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass(); + Preconditions.checkArgument( + StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null, "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); partialUpsertHandler = new PartialUpsertHandler(schema, comparisonColumns, upsertConfig); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 3bf3a3b156cd..15f202a07d88 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -24,8 +24,6 @@ import org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; -import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMerger; -import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -42,14 +40,11 @@ * If a merger for row is defined then it takes precedence and ignores column mergers. */ public class PartialUpsertHandler { - private final PartialUpsertColumnMerger _defaultPartialUpsertMerger; private final List _comparisonColumns; private final List _primaryKeyColumns; private final PartialUpsertMerger _partialUpsertMerger; public PartialUpsertHandler(Schema schema, List comparisonColumns, UpsertConfig upsertConfig) { - _defaultPartialUpsertMerger = - PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy()); _comparisonColumns = comparisonColumns; _primaryKeyColumns = schema.getPrimaryKeyColumns(); @@ -63,38 +58,52 @@ public void merge(LazyRow prevRecord, GenericRow newRecord, Map // merger current row with previously indexed row _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult); - for (String column : prevRecord.getColumnNames()) { - // no merger to apply on primary key columns - if (_primaryKeyColumns.contains(column)) { - continue; + if (_partialUpsertMerger instanceof PartialUpsertColumnarMerger) { + // iterate over all columns in prevRecord and update newRecord with merged values + for (String column : prevRecord.getColumnNames()) { + // no merger to apply on primary key columns + if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) { + continue; + } + + // use merged column value from result map + if (reuseMergerResult.containsKey(column)) { + Object mergedValue = reuseMergerResult.get(column); + setMergedValue(newRecord, column, mergedValue); + } } - // no merger to apply on comparison key column, use previous row's value if current is null - if (_comparisonColumns.contains(column)) { - if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) { - newRecord.putValue(column, prevRecord.getValue(column)); - newRecord.removeNullValueField(column); + } else { + // iterate over only merger results and update newRecord with merged values + for (Map.Entry entry : reuseMergerResult.entrySet()) { + // skip if primary key column + String column = entry.getKey(); + if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) { + continue; } - continue; + + Object mergedValue = entry.getValue(); + setMergedValue(newRecord, column, mergedValue); } + } - // use merged column value from result map - if (reuseMergerResult.containsKey(column)) { - Object mergedValue = reuseMergerResult.get(column); - if (mergedValue != null) { - // remove null value field if it was set - newRecord.removeNullValueField(column); - newRecord.putValue(column, mergedValue); - } else { - // if column exists but mapped to a null value then merger result was a null value - newRecord.addNullValueField(column); - newRecord.putValue(column, null); - } - } else if (!(_partialUpsertMerger instanceof PartialUpsertColumnarMerger)) { - // PartialUpsertColumnMerger already handles default merger but for any custom implementations - // non merged columns need to be applied with default merger - newRecord.putValue(column, - _defaultPartialUpsertMerger.merge(prevRecord.getValue(column), newRecord.getValue(column))); + // handle comparison columns + for (String column: _comparisonColumns) { + if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) { + newRecord.putValue(column, prevRecord.getValue(column)); + newRecord.removeNullValueField(column); } } } + + private void setMergedValue(GenericRow newRecord, String column, Object mergedValue) { + if (mergedValue != null) { + // remove null value field if it was set + newRecord.removeNullValueField(column); + newRecord.putValue(column, mergedValue); + } else { + // if column exists but mapped to a null value then merger result was a null value + newRecord.addNullValueField(column); + newRecord.putValue(column, null); + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java index 9f33fbdc010d..b9bc2e84c3b8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java @@ -41,5 +41,5 @@ public interface PartialUpsertMerger { * @param newRecord * @param mergerResult */ - public void merge(LazyRow prevRecord, GenericRow newRecord, Map mergerResult); + void merge(LazyRow prevRecord, GenericRow newRecord, Map mergerResult); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java index 96e60cc0d0e0..18cac444f301 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java @@ -40,14 +40,14 @@ private PartialUpsertMergerFactory() { public static PartialUpsertMerger getPartialUpsertMerger(List primaryKeyColumns, List comparisonColumns, UpsertConfig upsertConfig) { PartialUpsertMerger partialUpsertMerger; - String customRowMerger = upsertConfig.getRowMergerCustomImplementation(); + String customRowMerger = upsertConfig.getPartialUpsertMergerClass(); // If a custom implementation is provided in config, initialize an implementation and return. if (StringUtils.isNotBlank(customRowMerger)) { try { Class partialUpsertMergerClass = Class.forName(customRowMerger); if (!BasePartialUpsertMerger.class.isAssignableFrom(partialUpsertMergerClass)) { throw new RuntimeException( - "Provided rowMergerCustomImplementation is not an implementation of BasePartialUpsertMerger.class"); + "Provided partialUpsertMergerClass is not an implementation of BasePartialUpsertMerger.class"); } partialUpsertMerger = (PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 6b5e305a9e88..423ea9167650 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -921,19 +921,22 @@ static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema sche UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); assert upsertConfig != null; Map partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); - String rowMergerCustomImplementation = upsertConfig.getRowMergerCustomImplementation(); + String partialUpsertMergerClass = upsertConfig.getPartialUpsertMergerClass(); - Preconditions.checkState(StringUtils.isNotBlank(rowMergerCustomImplementation) - || MapUtils.isNotEmpty(partialUpsertStrategies), - "At least one of rowMergerCustomImplementation or partialUpsertStrategies must be provided for partial upsert table"); + Preconditions.checkState( + StringUtils.isNotBlank(partialUpsertMergerClass) || MapUtils.isNotEmpty(partialUpsertStrategies), + "At least one of partialUpsertMergerClass or partialUpsertStrategies must be provided for partial upsert " + + "table"); List primaryKeyColumns = schema.getPrimaryKeyColumns(); - // skip the partial upsert strategies check if rowMergerCustomImplementation is provided - if (StringUtils.isBlank(rowMergerCustomImplementation)) { + // skip the partial upsert strategies check if partialUpsertMergerClass is provided + if (StringUtils.isBlank(partialUpsertMergerClass)) { + // validate partial upsert column mergers for (Map.Entry entry : partialUpsertStrategies.entrySet()) { String column = entry.getKey(); UpsertConfig.Strategy columnStrategy = entry.getValue(); - Preconditions.checkState(!primaryKeyColumns.contains(column), "Merger cannot be applied to primary key columns"); + Preconditions.checkState(!primaryKeyColumns.contains(column), + "Merger cannot be applied to primary key columns"); if (upsertConfig.getComparisonColumns() != null) { Preconditions.checkState(!upsertConfig.getComparisonColumns().contains(column), diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index 0a0b0b6a495c..89167494706b 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -142,7 +142,7 @@ private void testCustomMerge(LazyRow prevRecord, GenericRow newRecord, GenericRo UpsertConfig upsertConfig = new UpsertConfig(); upsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE); - upsertConfig.setRowMergerCustomImplementation("org.apache.pinot.segment.local.upsert.CustomPartialUpsertRowMerger"); + upsertConfig.setPartialUpsertMergerClass("org.apache.pinot.segment.local.upsert.CustomPartialUpsertRowMerger"); try (MockedStatic partialUpsertMergerFactory = mockStatic( PartialUpsertMergerFactory.class)) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactoryTest.java index 661af515bb54..72cb75ad47c5 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactoryTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactoryTest.java @@ -44,7 +44,7 @@ public void testGetPartialUpsertMerger() { partialUpsertStrategies.put("field1", UpsertConfig.Strategy.OVERWRITE); upsertConfig.setPartialUpsertStrategies(partialUpsertStrategies); upsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.IGNORE); - upsertConfig.setRowMergerCustomImplementation( + upsertConfig.setPartialUpsertMergerClass( "org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger"); PartialUpsertMerger partialUpsertMerger = diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index 75905c5767f9..848c99199963 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -52,7 +52,7 @@ public enum Strategy { private Strategy _defaultPartialUpsertStrategy = Strategy.OVERWRITE; @JsonPropertyDescription("Class name for custom row merger implementation") - private String _rowMergerCustomImplementation; + private String _partialUpsertMergerClass; @JsonPropertyDescription("Columns for upsert comparison, default to time column") private List _comparisonColumns; @@ -113,8 +113,8 @@ public Strategy getDefaultPartialUpsertStrategy() { return _defaultPartialUpsertStrategy; } - public String getRowMergerCustomImplementation() { - return _rowMergerCustomImplementation; + public String getPartialUpsertMergerClass() { + return _partialUpsertMergerClass; } public List getComparisonColumns() { @@ -184,10 +184,10 @@ public void setDefaultPartialUpsertStrategy(Strategy defaultPartialUpsertStrateg /** * Specify to plug a custom implementation for merging rows in partial upsert realtime table. - * @param rowMergerCustomImplementation + * @param partialUpsertMergerClass */ - public void setRowMergerCustomImplementation(String rowMergerCustomImplementation) { - _rowMergerCustomImplementation = rowMergerCustomImplementation; + public void setPartialUpsertMergerClass(String partialUpsertMergerClass) { + _partialUpsertMergerClass = partialUpsertMergerClass; } /** From db12052ce84e67087170b7b4701b1085d5c69e10 Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Wed, 3 Apr 2024 01:51:41 +0530 Subject: [PATCH 6/9] fix partial upsert merger class validation --- .../pinot/segment/local/utils/TableConfigUtils.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 423ea9167650..08c3a591ed9c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -923,10 +923,11 @@ static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema sche Map partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); String partialUpsertMergerClass = upsertConfig.getPartialUpsertMergerClass(); - Preconditions.checkState( - StringUtils.isNotBlank(partialUpsertMergerClass) || MapUtils.isNotEmpty(partialUpsertStrategies), - "At least one of partialUpsertMergerClass or partialUpsertStrategies must be provided for partial upsert " - + "table"); + // check if partialUpsertMergerClass is provided then partialUpsertStrategies should be empty + if (StringUtils.isNotBlank(partialUpsertMergerClass)) { + Preconditions.checkState(MapUtils.isEmpty(partialUpsertStrategies), + "If partialUpsertMergerClass is provided then partialUpsertStrategies should be empty"); + } List primaryKeyColumns = schema.getPrimaryKeyColumns(); // skip the partial upsert strategies check if partialUpsertMergerClass is provided From 917c9d2407b04279934520f4dc794557ac390a24 Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Thu, 4 Apr 2024 21:47:28 +0530 Subject: [PATCH 7/9] address review comments --- .../BaseTableUpsertMetadataManager.java | 3 +- ...rentMapPartitionUpsertMetadataManager.java | 1 + .../local/upsert/PartialUpsertHandler.java | 43 ++++++------------- .../merger/PartialUpsertMergerFactory.java | 5 +-- .../upsert/PartialUpsertHandlerTest.java | 3 +- 5 files changed, 19 insertions(+), 36 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index f96f93bcd904..2ac9d88dfcaf 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -66,8 +66,7 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD Preconditions.checkArgument( StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null, "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); - partialUpsertHandler = - new PartialUpsertHandler(schema, comparisonColumns, upsertConfig); + partialUpsertHandler = new PartialUpsertHandler(schema, comparisonColumns, upsertConfig); } String deleteRecordColumn = upsertConfig.getDeleteRecordColumn(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 6915f95667e0..27b7abe952e1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -342,6 +342,7 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) { int currentDocId = recordLocation.getDocId(); if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) { _reusePreviousRow.init(currentSegment, currentDocId); + _reuseMergerResult.clear(); _partialUpsertHandler.merge(_reusePreviousRow, record, _reuseMergerResult); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 15f202a07d88..f9df9e2e2f15 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -20,11 +20,12 @@ import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.pinot.segment.local.segment.readers.LazyRow; -import org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -43,51 +44,36 @@ public class PartialUpsertHandler { private final List _comparisonColumns; private final List _primaryKeyColumns; private final PartialUpsertMerger _partialUpsertMerger; + private final TreeMap _fieldSpecMap; public PartialUpsertHandler(Schema schema, List comparisonColumns, UpsertConfig upsertConfig) { _comparisonColumns = comparisonColumns; _primaryKeyColumns = schema.getPrimaryKeyColumns(); + _fieldSpecMap = schema.getFieldSpecMap(); _partialUpsertMerger = PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig); } public void merge(LazyRow prevRecord, GenericRow newRecord, Map reuseMergerResult) { - reuseMergerResult.clear(); // merger current row with previously indexed row _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult); - if (_partialUpsertMerger instanceof PartialUpsertColumnarMerger) { - // iterate over all columns in prevRecord and update newRecord with merged values - for (String column : prevRecord.getColumnNames()) { - // no merger to apply on primary key columns - if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) { - continue; - } - - // use merged column value from result map - if (reuseMergerResult.containsKey(column)) { - Object mergedValue = reuseMergerResult.get(column); - setMergedValue(newRecord, column, mergedValue); - } + // iterate over only merger results and update newRecord with merged values + for (Map.Entry entry : reuseMergerResult.entrySet()) { + // skip if primary key column + String column = entry.getKey(); + if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) { + continue; } - } else { - // iterate over only merger results and update newRecord with merged values - for (Map.Entry entry : reuseMergerResult.entrySet()) { - // skip if primary key column - String column = entry.getKey(); - if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) { - continue; - } - Object mergedValue = entry.getValue(); - setMergedValue(newRecord, column, mergedValue); - } + Object mergedValue = entry.getValue(); + setMergedValue(newRecord, column, mergedValue); } // handle comparison columns - for (String column: _comparisonColumns) { + for (String column : _comparisonColumns) { if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) { newRecord.putValue(column, prevRecord.getValue(column)); newRecord.removeNullValueField(column); @@ -102,8 +88,7 @@ private void setMergedValue(GenericRow newRecord, String column, Object mergedVa newRecord.putValue(column, mergedValue); } else { // if column exists but mapped to a null value then merger result was a null value - newRecord.addNullValueField(column); - newRecord.putValue(column, null); + newRecord.putDefaultNullValue(column, _fieldSpecMap.get(column).getDefaultNullValue()); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java index 18cac444f301..e1ce8c668474 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.segment.local.upsert.merger; -import java.lang.reflect.InvocationTargetException; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.UpsertConfig; @@ -52,9 +51,7 @@ public static PartialUpsertMerger getPartialUpsertMerger(List primaryKey partialUpsertMerger = (PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class) .newInstance(primaryKeyColumns, comparisonColumns, upsertConfig); - } catch (ClassNotFoundException - | NoSuchMethodException | InstantiationException | IllegalAccessException - | InvocationTargetException e) { + } catch (Exception e) { throw new RuntimeException( String.format("Could not load partial upsert implementation class by name %s", customRowMerger), e); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index 89167494706b..a334f48f5b0e 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -88,8 +88,9 @@ public void testCustomPartialUpsertMergerWithNullResult() { LazyRow prevRecord = mock(LazyRow.class); mockLazyRow(prevRecord, Map.of("pk", "pk1", "field1", 5L, "field2", "set", "hoursSinceEpoch", 2L)); Map expectedData = new HashMap<>(Map.of("pk", "pk1", "field2", "reset", "hoursSinceEpoch", 2L)); - expectedData.put("field1", null); + expectedData.put("field1", Long.MIN_VALUE); GenericRow expectedRecord = initGenericRow(new GenericRow(), expectedData); + expectedRecord.addNullValueField("field1"); testCustomMerge(prevRecord, newRecord, expectedRecord, getCustomMerger()); } From 072b0e53d08491fa813805dafc7bc874702bd633 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Thu, 4 Apr 2024 18:50:16 -0700 Subject: [PATCH 8/9] Minor cleanup --- .../BaseTableUpsertMetadataManager.java | 7 --- ...rentMapPartitionUpsertMetadataManager.java | 7 ++- .../local/upsert/PartialUpsertHandler.java | 36 +++++------ .../merger/BasePartialUpsertMerger.java | 3 +- .../merger/PartialUpsertColumnarMerger.java | 60 +++++++++---------- .../upsert/merger/PartialUpsertMerger.java | 18 ++---- .../merger/PartialUpsertMergerFactory.java | 31 +++------- .../upsert/merger/columnar/AppendMerger.java | 4 -- .../upsert/merger/columnar/IgnoreMerger.java | 4 -- .../merger/columnar/IncrementMerger.java | 4 -- .../upsert/merger/columnar/MaxMerger.java | 5 -- .../upsert/merger/columnar/MinMerger.java | 5 -- .../merger/columnar/OverwriteMerger.java | 4 -- .../columnar/PartialUpsertColumnMerger.java | 1 + .../upsert/merger/columnar/UnionMerger.java | 2 - .../segment/local/utils/TableConfigUtils.java | 7 +-- .../upsert/PartialUpsertHandlerTest.java | 25 ++++---- 17 files changed, 76 insertions(+), 147 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 2ac9d88dfcaf..2fbe4a28e512 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -22,10 +22,8 @@ import java.io.File; import java.util.Collections; import java.util.List; -import java.util.Map; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; @@ -61,11 +59,6 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD PartialUpsertHandler partialUpsertHandler = null; if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) { - Map partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); - String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass(); - Preconditions.checkArgument( - StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null, - "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); partialUpsertHandler = new PartialUpsertHandler(schema, comparisonColumns, upsertConfig); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 27b7abe952e1..e513e3879eb7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -50,7 +51,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp // Used to initialize a reference to previous row for merging in partial upsert private final LazyRow _reusePreviousRow = new LazyRow(); - private final HashMap _reuseMergerResult = new HashMap<>(); + private final Map _reuseMergeResultHolder = new HashMap<>(); @VisibleForTesting final ConcurrentHashMap _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); @@ -342,8 +343,8 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) { int currentDocId = recordLocation.getDocId(); if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) { _reusePreviousRow.init(currentSegment, currentDocId); - _reuseMergerResult.clear(); - _partialUpsertHandler.merge(_reusePreviousRow, record, _reuseMergerResult); + _partialUpsertHandler.merge(_reusePreviousRow, record, _reuseMergeResultHolder); + _reuseMergeResultHolder.clear(); } } return recordLocation; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index f9df9e2e2f15..118412ab7725 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import javax.annotation.Nullable; import org.apache.pinot.segment.local.segment.readers.LazyRow; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; @@ -41,54 +42,49 @@ * If a merger for row is defined then it takes precedence and ignores column mergers. */ public class PartialUpsertHandler { - private final List _comparisonColumns; private final List _primaryKeyColumns; - private final PartialUpsertMerger _partialUpsertMerger; + private final List _comparisonColumns; private final TreeMap _fieldSpecMap; + private final PartialUpsertMerger _partialUpsertMerger; public PartialUpsertHandler(Schema schema, List comparisonColumns, UpsertConfig upsertConfig) { - _comparisonColumns = comparisonColumns; _primaryKeyColumns = schema.getPrimaryKeyColumns(); + _comparisonColumns = comparisonColumns; _fieldSpecMap = schema.getFieldSpecMap(); - _partialUpsertMerger = PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig); } - public void merge(LazyRow prevRecord, GenericRow newRecord, Map reuseMergerResult) { - - // merger current row with previously indexed row - _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult); + public void merge(LazyRow previousRow, GenericRow newRow, Map resultHolder) { + _partialUpsertMerger.merge(previousRow, newRow, resultHolder); // iterate over only merger results and update newRecord with merged values - for (Map.Entry entry : reuseMergerResult.entrySet()) { - // skip if primary key column + for (Map.Entry entry : resultHolder.entrySet()) { + // skip primary key and comparison columns String column = entry.getKey(); if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) { continue; } - - Object mergedValue = entry.getValue(); - setMergedValue(newRecord, column, mergedValue); + setMergedValue(newRow, column, entry.getValue()); } // handle comparison columns for (String column : _comparisonColumns) { - if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) { - newRecord.putValue(column, prevRecord.getValue(column)); - newRecord.removeNullValueField(column); + if (newRow.isNullValue(column) && !previousRow.isNullValue(column)) { + newRow.putValue(column, previousRow.getValue(column)); + newRow.removeNullValueField(column); } } } - private void setMergedValue(GenericRow newRecord, String column, Object mergedValue) { + private void setMergedValue(GenericRow row, String column, @Nullable Object mergedValue) { if (mergedValue != null) { // remove null value field if it was set - newRecord.removeNullValueField(column); - newRecord.putValue(column, mergedValue); + row.removeNullValueField(column); + row.putValue(column, mergedValue); } else { // if column exists but mapped to a null value then merger result was a null value - newRecord.putDefaultNullValue(column, _fieldSpecMap.get(column).getDefaultNullValue()); + row.putDefaultNullValue(column, _fieldSpecMap.get(column).getDefaultNullValue()); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/BasePartialUpsertMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/BasePartialUpsertMerger.java index 319bb220ec4c..07923fede2b0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/BasePartialUpsertMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/BasePartialUpsertMerger.java @@ -23,12 +23,11 @@ public abstract class BasePartialUpsertMerger implements PartialUpsertMerger { - protected final List _primaryKeyColumns; protected final List _comparisonColumns; protected final UpsertConfig _upsertConfig; - public BasePartialUpsertMerger(List primaryKeyColumns, List comparisonColumns, + protected BasePartialUpsertMerger(List primaryKeyColumns, List comparisonColumns, UpsertConfig upsertConfig) { _primaryKeyColumns = primaryKeyColumns; _comparisonColumns = comparisonColumns; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java index 40aa854129a8..948197e2ac11 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.upsert.merger; +import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,7 +36,6 @@ * table config. */ public class PartialUpsertColumnarMerger extends BasePartialUpsertMerger { - private final PartialUpsertColumnMerger _defaultColumnValueMerger; private final Map _column2Mergers = new HashMap<>(); @@ -45,6 +45,7 @@ public PartialUpsertColumnarMerger(List primaryKeyColumns, List _defaultColumnValueMerger = PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy()); Map partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); + Preconditions.checkArgument(partialUpsertStrategies != null, "Partial upsert strategies must be configured"); for (Map.Entry entry : partialUpsertStrategies.entrySet()) { _column2Mergers.put(entry.getKey(), PartialUpsertColumnMergerFactory.getMerger(entry.getValue())); } @@ -60,44 +61,39 @@ public PartialUpsertColumnarMerger(List primaryKeyColumns, List * * For example, overwrite merger will only override the prev value if the new value is not null. * Null values will override existing values if not configured. They can be ignored by using ignoreMerger. - * - * @param previousRow the value of given field from the last derived full record during ingestion. - * @param currentRow the value of given field from the new consumed record. - * @param mergerResult merge the values from previous and current row and return as a map */ @Override - public void merge(LazyRow previousRow, GenericRow currentRow, Map mergerResult) { + public void merge(LazyRow previousRow, GenericRow newRow, Map resultHolder) { for (String column : previousRow.getColumnNames()) { - // skip any mergers on these columns. PartialUpsertHandler ensures this too. - if (!_primaryKeyColumns.contains(column) && !_comparisonColumns.contains(column)) { - PartialUpsertColumnMerger merger = _column2Mergers.getOrDefault(column, _defaultColumnValueMerger); - /** - * Non-overwrite mergers - * (1) If the value of the previous is null value, skip merging and use the new value - * (2) Else If the value of new value is null, use the previous value (even for comparison columns). - * (3) Else If the column is not a comparison column, we applied the merged value to it. - */ - if (!(merger instanceof OverwriteMerger)) { + // Skip primary key and comparison columns + if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) { + continue; + } + PartialUpsertColumnMerger merger = _column2Mergers.getOrDefault(column, _defaultColumnValueMerger); + // Non-overwrite mergers + // (1) If the value of the previous is null value, skip merging and use the new value + // (2) Else If the value of new value is null, use the previous value (even for comparison columns) + // (3) Else If the column is not a comparison column, we applied the merged value to it + if (!(merger instanceof OverwriteMerger)) { + Object prevValue = previousRow.getValue(column); + if (prevValue != null) { + if (newRow.isNullValue(column)) { + resultHolder.put(column, prevValue); + } else { + resultHolder.put(column, merger.merge(prevValue, newRow.getValue(column))); + } + } + } else { + // Overwrite mergers + // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value + // (2) Otherwise, use previous value if it is not null + if (newRow.isNullValue(column)) { Object prevValue = previousRow.getValue(column); if (prevValue != null) { - if (currentRow.isNullValue(column)) { - mergerResult.put(column, prevValue); - } else { - mergerResult.put(column, merger.merge(prevValue, currentRow.getValue(column))); - } - } - } else { - // Overwrite mergers. - // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value - // (2) Otherwise, use previous value if it is not null - if (currentRow.isNullValue(column)) { - Object prevValue = previousRow.getValue(column); - if (prevValue != null) { - mergerResult.put(column, prevValue); - } + resultHolder.put(column, prevValue); } } } } } -} +} \ No newline at end of file diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java index b9bc2e84c3b8..2fdd3c821872 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java @@ -24,22 +24,14 @@ /** - * Merger previously persisted row with the new incoming row. - *

- * Implement this interface to define logic to merge rows. {@link LazyRow} provides abstraction row like abstraction - * to read previously persisted row by lazily loading column values if needed. For automatic plugging of the - * interface via {@link org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory} - * implement {@link BasePartialUpsertMerger} + * Merger to merge previously persisted row with the new incoming row. + * Custom implementation can be plugged by implementing this interface and add the class name to the upsert config. */ public interface PartialUpsertMerger { /** - * Merge previous row with new incoming row and persist the merged results per column in the provided - * mergerResult map. {@link org.apache.pinot.segment.local.upsert.PartialUpsertHandler} ensures the primary key and - * comparison columns are not modified, comparison columns are merged and only the latest non values are stored. - * @param prevRecord - * @param newRecord - * @param mergerResult + * Merges previous row with new incoming row and persists the merged results per column in the provided resultHolder. + * Primary key and comparison columns should not be merged because their values are not allowed to be modified. */ - void merge(LazyRow prevRecord, GenericRow newRecord, Map mergerResult); + void merge(LazyRow previousRow, GenericRow newRow, Map resultHolder); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java index e1ce8c668474..5b3b0148026e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java @@ -24,41 +24,26 @@ public class PartialUpsertMergerFactory { - private PartialUpsertMergerFactory() { } /** - * Initialise the default partial upsert merger or initialise a custom implementation from a given class name in - * config - * @param primaryKeyColumns - * @param comparisonColumns - * @param upsertConfig - * @return + * Returns the default partial upsert merger or a custom implementation from a given class name in the config. */ public static PartialUpsertMerger getPartialUpsertMerger(List primaryKeyColumns, List comparisonColumns, UpsertConfig upsertConfig) { - PartialUpsertMerger partialUpsertMerger; - String customRowMerger = upsertConfig.getPartialUpsertMergerClass(); + String customMergerClassName = upsertConfig.getPartialUpsertMergerClass(); // If a custom implementation is provided in config, initialize an implementation and return. - if (StringUtils.isNotBlank(customRowMerger)) { + if (StringUtils.isNotBlank(customMergerClassName)) { try { - Class partialUpsertMergerClass = Class.forName(customRowMerger); - if (!BasePartialUpsertMerger.class.isAssignableFrom(partialUpsertMergerClass)) { - throw new RuntimeException( - "Provided partialUpsertMergerClass is not an implementation of BasePartialUpsertMerger.class"); - } - partialUpsertMerger = - (PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class) - .newInstance(primaryKeyColumns, comparisonColumns, upsertConfig); + Class partialUpsertMergerClass = Class.forName(customMergerClassName); + return (PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class) + .newInstance(primaryKeyColumns, comparisonColumns, upsertConfig); } catch (Exception e) { throw new RuntimeException( - String.format("Could not load partial upsert implementation class by name %s", customRowMerger), e); + String.format("Failed to instantiate partial upsert merger with class: %s", customMergerClassName), e); } - } else { - // return default implementation - partialUpsertMerger = new PartialUpsertColumnarMerger(primaryKeyColumns, comparisonColumns, upsertConfig); } - return partialUpsertMerger; + return new PartialUpsertColumnarMerger(primaryKeyColumns, comparisonColumns, upsertConfig); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/AppendMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/AppendMerger.java index 0f6064a37573..3666ae890203 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/AppendMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/AppendMerger.java @@ -18,16 +18,12 @@ */ package org.apache.pinot.segment.local.upsert.merger.columnar; - - /** * Merges 2 records and returns the merged record. * Append the new value from incoming row to the existing value from multi-value field. Then return the merged record. * Append merger allows duplicated records in the multi-value field. */ public class AppendMerger implements PartialUpsertColumnMerger { - AppendMerger() { - } /** * Append the new value from incoming row to the given multi-value field of previous record. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IgnoreMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IgnoreMerger.java index 8684def6c984..16cab35fc333 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IgnoreMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IgnoreMerger.java @@ -18,15 +18,11 @@ */ package org.apache.pinot.segment.local.upsert.merger.columnar; - - /** * Merges 2 records and returns the merged record. * By default, ignore the new value from incoming row. Then return the merged record. */ public class IgnoreMerger implements PartialUpsertColumnMerger { - IgnoreMerger() { - } @Override public Object merge(Object previousValue, Object currentValue) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IncrementMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IncrementMerger.java index 9f00ed70b446..e26dda98c3de 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IncrementMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/IncrementMerger.java @@ -18,15 +18,11 @@ */ package org.apache.pinot.segment.local.upsert.merger.columnar; - - /** * Merges 2 records and returns the merged record. * Add the new value from incoming row to the existing value from numeric field. Then return the merged record. */ public class IncrementMerger implements PartialUpsertColumnMerger { - IncrementMerger() { - } /** * Increment the new value from incoming row to the given field of previous record. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MaxMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MaxMerger.java index 6385363cb3ad..c21ddaecd937 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MaxMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MaxMerger.java @@ -18,13 +18,8 @@ */ package org.apache.pinot.segment.local.upsert.merger.columnar; - - public class MaxMerger implements PartialUpsertColumnMerger { - MaxMerger() { - } - /** * Keep the maximal value for the given field. */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MinMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MinMerger.java index 22543a943e76..aeddafe8dd3f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MinMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/MinMerger.java @@ -18,13 +18,8 @@ */ package org.apache.pinot.segment.local.upsert.merger.columnar; - - public class MinMerger implements PartialUpsertColumnMerger { - MinMerger() { - } - /** * Keep the minimal value for the given field. */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/OverwriteMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/OverwriteMerger.java index f47d4cebfd1c..915a43174199 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/OverwriteMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/OverwriteMerger.java @@ -18,15 +18,11 @@ */ package org.apache.pinot.segment.local.upsert.merger.columnar; - - /** * Merges 2 records and returns the merged record. * Overwrite the existing value for the given field. Then return the merged record. */ public class OverwriteMerger implements PartialUpsertColumnMerger { - OverwriteMerger() { - } @Override public Object merge(Object previousValue, Object currentValue) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMerger.java index edf43a65e3ba..dee7b94b52c2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/PartialUpsertColumnMerger.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.upsert.merger.columnar; public interface PartialUpsertColumnMerger { + /** * Handle partial upsert merge for single column between previous and new row. * diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/UnionMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/UnionMerger.java index 0c2067f69d15..c0b2eeef54b6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/UnionMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/columnar/UnionMerger.java @@ -28,8 +28,6 @@ * Union merger will dedup duplicated records in the multi-value field. */ public class UnionMerger implements PartialUpsertColumnMerger { - UnionMerger() { - } /** * Union the new value from incoming row to the given multi-value field of previous record. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 08c3a591ed9c..d03e5ab9cfc0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -927,11 +927,8 @@ static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema sche if (StringUtils.isNotBlank(partialUpsertMergerClass)) { Preconditions.checkState(MapUtils.isEmpty(partialUpsertStrategies), "If partialUpsertMergerClass is provided then partialUpsertStrategies should be empty"); - } - - List primaryKeyColumns = schema.getPrimaryKeyColumns(); - // skip the partial upsert strategies check if partialUpsertMergerClass is provided - if (StringUtils.isBlank(partialUpsertMergerClass)) { + } else { + List primaryKeyColumns = schema.getPrimaryKeyColumns(); // validate partial upsert column mergers for (Map.Entry entry : partialUpsertStrategies.entrySet()) { String column = entry.getKey(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index a334f48f5b0e..4b954aa1400e 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -158,20 +158,17 @@ private void testCustomMerge(LazyRow prevRecord, GenericRow newRecord, GenericRo } public PartialUpsertMerger getCustomMerger() { - return new PartialUpsertMerger() { - @Override - public void merge(LazyRow prevRecord, GenericRow newRecord, Map mergerResult) { - if ((newRecord.getValue("field2")).equals("set")) { - // use default merger (overwrite) - return; - } - if ((newRecord.getValue("field2")).equals("inc")) { - mergerResult.put("field1", (Long) prevRecord.getValue("field1") + (Long) newRecord.getValue("field1")); - return; - } - if ((newRecord.getValue("field2")).equals("reset")) { - mergerResult.put("field1", null); - } + return (previousRow, newRow, resultHolder) -> { + if ((newRow.getValue("field2")).equals("set")) { + // use default merger (overwrite) + return; + } + if ((newRow.getValue("field2")).equals("inc")) { + resultHolder.put("field1", (Long) previousRow.getValue("field1") + (Long) newRow.getValue("field1")); + return; + } + if ((newRow.getValue("field2")).equals("reset")) { + resultHolder.put("field1", null); } }; } From edf3ab8d24fb0fb7843f6fa18bfc9b658d657d1a Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Thu, 4 Apr 2024 19:27:38 -0700 Subject: [PATCH 9/9] Fix linter --- .../local/upsert/merger/PartialUpsertColumnarMerger.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java index 948197e2ac11..3e510a8bcf9e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertColumnarMerger.java @@ -96,4 +96,4 @@ public void merge(LazyRow previousRow, GenericRow newRow, Map re } } } -} \ No newline at end of file +}