Skip to content

Commit

Permalink
custom partial upsert row merger
Browse files Browse the repository at this point in the history
  • Loading branch information
rohityadav1993 committed Oct 5, 2023
1 parent d1021df commit 8951cc9
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.segment.readers;

import java.io.IOException;
import java.util.HashMap;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.data.readers.GenericRow;


/**
* A wrapper class to read column values of a row. The wrapper can either be over a {@link IndexSegment} and docId or
* over a {@link GenericRow}<br>
* The advantage of having wrapper over segment and docId is column values are read only when
* {@link LazyRow#getValue(String)} is invoked.
* This is useful to reduce the disk reads incurred due to loading the previous row during merge step.
* There isn't any advantage to have a LazyRow wrap a GenericRow but has been kept for syntactic sugar.
*/
public class LazyRow {
private IndexSegment _segment;
private int _docId;
private GenericRow _row;

private HashMap<String, Object> _fieldToValueMap = new HashMap<>();

public LazyRow() {
}

public LazyRow(GenericRow row) {
_row = row;
}

public LazyRow(IndexSegment segment, int docId) {
_segment = segment;
_docId = docId;
}

public void setRow(GenericRow row) {
_row = row;
}

public void init(IndexSegment segment, int docId) {
this.clear();
_segment = segment;
_docId = docId;
}

public void init(GenericRow row) {
this.clear();
_row = row;
}

public Object getValue(String column) {

if (_row != null) {
if (_row.isNullValue(column)) {
return null;
}
return _row.getValue(column);
}
return _fieldToValueMap.computeIfAbsent(column, col -> {
Object value = null;
try (PinotSegmentColumnReader columnReader = new PinotSegmentColumnReader(_segment, col)) {
if (!columnReader.isNull(_docId)) {
value = columnReader.getValue(_docId);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return value;
});
}

public void clear() {
_row = null;
_fieldToValueMap.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
Preconditions.checkArgument(partialUpsertStrategies != null,
"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
_partialUpsertHandler =
new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
_comparisonColumns);
new PartialUpsertHandler(schema, upsertConfig, _comparisonColumns);
}

_enableSnapshot = upsertConfig.isEnableSnapshot();
Expand Down Expand Up @@ -145,6 +144,29 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
}
}

/**
* Reuses a PartialUpsertHandler if there is no custom row merger otherwise creates a new instance to avoid
* concurrent modification of state(LazyRow) between segments of the table
* @return partial upsert handler to merge previous and new row
*/
protected PartialUpsertHandler getPartialUpsertHandler() {
UpsertConfig upsertConfig = _tableConfig.getUpsertConfig();
if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
Preconditions.checkArgument(partialUpsertStrategies != null,
"Partial-upsert strategies must be configured for partial-upsert enabled table: %s",
_tableNameWithType);
if (upsertConfig.getRowMergerCustomImplementation() != null) {
// In case of custom merger, the PUH is dependent on LazyRow object to be reused. LazyRow is stateful and
// cause concurrent modification issues, hence a new PUH is created per partition
return new PartialUpsertHandler(_schema, upsertConfig, _comparisonColumns);
} else {
return _partialUpsertHandler;
}
}
return null;
}

/**
* Can be overridden to initialize custom variables after other variables are set but before preload starts. This is
* needed because preload will load segments which might require these custom variables.
Expand Down Expand Up @@ -271,6 +293,9 @@ public boolean isPreloading() {

@Override
public UpsertConfig.Mode getUpsertMode() {
return _partialUpsertHandler == null ? UpsertConfig.Mode.FULL : UpsertConfig.Mode.PARTIAL;
return _tableConfig.getUpsertConfig() != null
&& _tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL
? UpsertConfig.Mode.PARTIAL
: UpsertConfig.Mode.FULL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler,
_comparisonColumns, _deleteRecordColumn, _hashFunction, getPartialUpsertHandler(),
_enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.pinot.segment.local.upsert;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.segment.local.segment.readers.LazyRow;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.upsert.merger.OverwriteMerger;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertRowMergeEvaluator;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertRowMergeEvaluatorFactory;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
Expand All @@ -41,16 +45,21 @@ public class PartialUpsertHandler {
private final PartialUpsertMerger _defaultPartialUpsertMerger;
private final List<String> _comparisonColumns;
private final List<String> _primaryKeyColumns;
private PartialUpsertRowMergeEvaluator _rowMerger;
private LazyRow _reusePreviousLazyRow;
private LazyRow _reuseNewLazyRow;
private Map<String, Object> _reuseRowMergerResult;

public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
_defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
public PartialUpsertHandler(Schema schema, UpsertConfig upsertConfig, List<String> comparisonColumns) {
_defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy());
_comparisonColumns = comparisonColumns;
_primaryKeyColumns = schema.getPrimaryKeyColumns();

for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
for (Map.Entry<String, UpsertConfig.Strategy> entry : upsertConfig.getPartialUpsertStrategies().entrySet()) {
_column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
}

initRowMerger(upsertConfig);
}

/**
Expand All @@ -69,7 +78,31 @@ public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> pa
* @param newRecord the new consumed record.
*/
public void merge(IndexSegment indexSegment, int docId, GenericRow newRecord) {

// If a row merger is initialised then evaluate it and get the results of merger
if (_rowMerger != null) {
_reuseRowMergerResult.clear();
_reusePreviousLazyRow.init(indexSegment, docId);
_reuseNewLazyRow.init(newRecord);
_rowMerger.evaluate(_reusePreviousLazyRow, _reuseNewLazyRow, _reuseRowMergerResult);
}

for (String column : indexSegment.getColumnNames()) {

// use result from custom merger result if present
if (_reuseRowMergerResult != null && _reuseRowMergerResult.containsKey(column)) {
if (!_primaryKeyColumns.contains(column) && !_comparisonColumns.contains(column)) {
Object mergedValue = _reuseRowMergerResult.get(column);
if (mergedValue == null) {
newRecord.addNullValueField(column);
} else {
newRecord.removeNullValueField(column);
newRecord.putValue(column, mergedValue);
}
}
// skip any other partial upsert for this column
continue;
}
if (!_primaryKeyColumns.contains(column)) {
PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
// Non-overwrite mergers
Expand Down Expand Up @@ -117,4 +150,26 @@ public void merge(IndexSegment indexSegment, int docId, GenericRow newRecord) {
}
}
}

private void initRowMerger(UpsertConfig upsertConfig) {
// If custom row merger is specified initialize row merger.
String rowMergerCustomImplementation = upsertConfig.getRowMergerCustomImplementation();
if (rowMergerCustomImplementation != null && !rowMergerCustomImplementation.equals("")) {
try {
setRowMerger(PartialUpsertRowMergeEvaluatorFactory.getInstance(rowMergerCustomImplementation));
} catch (Exception e) {
throw new RuntimeException("Cannot create partial upsert row merger", e);
}
} else {
_rowMerger = null;
}
}

@VisibleForTesting
public void setRowMerger(PartialUpsertRowMergeEvaluator rowMerger) {
_rowMerger = rowMerger;
_reusePreviousLazyRow = new LazyRow();
_reuseNewLazyRow = new LazyRow();
_reuseRowMergerResult = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.upsert.merger;

import java.util.Map;
import org.apache.pinot.segment.local.segment.readers.LazyRow;


public interface PartialUpsertRowMergeEvaluator {
void evaluate(LazyRow previousRow, LazyRow newRow, Map<String, Object> result);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.upsert.merger;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class PartialUpsertRowMergeEvaluatorFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(PartialUpsertRowMergeEvaluatorFactory.class);

private PartialUpsertRowMergeEvaluatorFactory() {
}

public static PartialUpsertRowMergeEvaluator getInstance(String implementationClassName) {
LOGGER.info("Creating PartialUpsertRowMergeEvaluator for class {}", implementationClassName);
if (StringUtils.isBlank(implementationClassName)) {
throw new IllegalArgumentException("Empty implementationClassName");
}
try {
Class<?> aClass = Class.forName(implementationClassName);
if (!PartialUpsertRowMergeEvaluator.class.isAssignableFrom(aClass)) {
throw new IllegalArgumentException(
"The provided class is not an implementation of PartialUpsertRowMergeEvaluator");
}
return (PartialUpsertRowMergeEvaluator) aClass.getConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to create a PartialUpsertRowMergeEvaluator with class %s",
implementationClassName), e);
}
}
}
Loading

0 comments on commit 8951cc9

Please sign in to comment.