Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rohityadav1993 committed Apr 4, 2024
1 parent db12052 commit 917c9d2
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
Preconditions.checkArgument(
StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null,
"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
partialUpsertHandler =
new PartialUpsertHandler(schema, comparisonColumns, upsertConfig);
partialUpsertHandler = new PartialUpsertHandler(schema, comparisonColumns, upsertConfig);
}

String deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
int currentDocId = recordLocation.getDocId();
if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
_reusePreviousRow.init(currentSegment, currentDocId);
_reuseMergerResult.clear();
_partialUpsertHandler.merge(_reusePreviousRow, record, _reuseMergerResult);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@

import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.pinot.segment.local.segment.readers.LazyRow;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;

Expand All @@ -43,51 +44,36 @@ public class PartialUpsertHandler {
private final List<String> _comparisonColumns;
private final List<String> _primaryKeyColumns;
private final PartialUpsertMerger _partialUpsertMerger;
private final TreeMap<String, FieldSpec> _fieldSpecMap;

public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) {
_comparisonColumns = comparisonColumns;
_primaryKeyColumns = schema.getPrimaryKeyColumns();
_fieldSpecMap = schema.getFieldSpecMap();

_partialUpsertMerger =
PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
}

public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
reuseMergerResult.clear();

// merger current row with previously indexed row
_partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);

if (_partialUpsertMerger instanceof PartialUpsertColumnarMerger) {
// iterate over all columns in prevRecord and update newRecord with merged values
for (String column : prevRecord.getColumnNames()) {
// no merger to apply on primary key columns
if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) {
continue;
}

// use merged column value from result map
if (reuseMergerResult.containsKey(column)) {
Object mergedValue = reuseMergerResult.get(column);
setMergedValue(newRecord, column, mergedValue);
}
// iterate over only merger results and update newRecord with merged values
for (Map.Entry<String, Object> entry : reuseMergerResult.entrySet()) {
// skip if primary key column
String column = entry.getKey();
if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) {
continue;
}
} else {
// iterate over only merger results and update newRecord with merged values
for (Map.Entry<String, Object> entry : reuseMergerResult.entrySet()) {
// skip if primary key column
String column = entry.getKey();
if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) {
continue;
}

Object mergedValue = entry.getValue();
setMergedValue(newRecord, column, mergedValue);
}
Object mergedValue = entry.getValue();
setMergedValue(newRecord, column, mergedValue);
}

// handle comparison columns
for (String column: _comparisonColumns) {
for (String column : _comparisonColumns) {
if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) {
newRecord.putValue(column, prevRecord.getValue(column));
newRecord.removeNullValueField(column);
Expand All @@ -102,8 +88,7 @@ private void setMergedValue(GenericRow newRecord, String column, Object mergedVa
newRecord.putValue(column, mergedValue);
} else {
// if column exists but mapped to a null value then merger result was a null value
newRecord.addNullValueField(column);
newRecord.putValue(column, null);
newRecord.putDefaultNullValue(column, _fieldSpecMap.get(column).getDefaultNullValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.segment.local.upsert.merger;

import java.lang.reflect.InvocationTargetException;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.config.table.UpsertConfig;
Expand Down Expand Up @@ -52,9 +51,7 @@ public static PartialUpsertMerger getPartialUpsertMerger(List<String> primaryKey
partialUpsertMerger =
(PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class)
.newInstance(primaryKeyColumns, comparisonColumns, upsertConfig);
} catch (ClassNotFoundException
| NoSuchMethodException | InstantiationException | IllegalAccessException
| InvocationTargetException e) {
} catch (Exception e) {
throw new RuntimeException(
String.format("Could not load partial upsert implementation class by name %s", customRowMerger), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ public void testCustomPartialUpsertMergerWithNullResult() {
LazyRow prevRecord = mock(LazyRow.class);
mockLazyRow(prevRecord, Map.of("pk", "pk1", "field1", 5L, "field2", "set", "hoursSinceEpoch", 2L));
Map<String, Object> expectedData = new HashMap<>(Map.of("pk", "pk1", "field2", "reset", "hoursSinceEpoch", 2L));
expectedData.put("field1", null);
expectedData.put("field1", Long.MIN_VALUE);
GenericRow expectedRecord = initGenericRow(new GenericRow(), expectedData);
expectedRecord.addNullValueField("field1");

testCustomMerge(prevRecord, newRecord, expectedRecord, getCustomMerger());
}
Expand Down

0 comments on commit 917c9d2

Please sign in to comment.