Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] custom partial upsert row merger #11584

Closed
wants to merge 1 commit into from

Conversation

rohityadav1993
Copy link
Contributor

@rohityadav1993 rohityadav1993 commented Sep 13, 2023

feature
release-notes

Problem: The PR addresses the feature gap of conditional column merger in partial upsert. With this, a table can be configured using a groovy script/custom class implementation to merge previous and new row column values based on a user-specified logic between previous and new row columns values.

Solution:

  1. A new row abstraction class called LazyRow, enables reading of the previous row's column's value from disk only when getValue() on LazyRow is called and caches the reads.
  2. Defines a new interface called PartialUpsertRowMergeEvaluator.merge() that takes previous and new Row and generates merged columns as Map<column_name, value>
  3. Moves creation of PartialUpsertHandler per table to per partition to avoid concurrent modification of LazyRow defined in PartialUpsertHandler by multiple consuming partitions.

TODO:

  1. Add implementation for PartialUpsertRowMergeEvaluator for groovy merger.
  2. Add groovy row merger configs
  3. Add test plan

Design doc: https://docs.google.com/document/d/1bBTCYZFP2stvzc6xZUOEh-XweVgC9WfD7uiSPbKtaZY/edit

private int _docId;
private GenericRow _row;

private HashMap<String, Object> _fieldToValueMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concurrentHashMap ?

Copy link
Contributor Author

@rohityadav1993 rohityadav1993 Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least in the current usage, I don't see a requirement to have a ConcurrentHashMap as the merger happens per row per segment in a single thread manner. Do you see any usage where we want to use LazyRow and there can be multiple threads referencing it?
I am taking reference from GenericRow which also using hashmap.

if (upsertConfig.getRowMergerCustomImplementation() != null) {
// In case of custom merger, the PUH is dependent on LazyRow object to be reused. LazyRow is stateful and
// cause concurrent modification issues, hence a new PUH is created per partition
return new PartialUpsertHandler(_schema, upsertConfig, _comparisonColumns);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. currently the partial upsert handler are initialized in the upsertTableManager and shared by partitionUpsertManager. After lazyRow passed in the partialUpsertHandler it become stateful, and multiple partitions can access the partial upsert handler. this leads to concurrent modification issue. So we have to avoid sharing the partial upsert handler among partitions.

@Jackie-Jiang do you see any improvement on this approach?

@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes Configuration Config changes (addition/deletion/change in behavior) upsert labels Sep 17, 2023
@rohityadav1993 rohityadav1993 force-pushed the masterry branch 2 times, most recently from eb129c0 to 6ae06a9 Compare October 3, 2023 07:11
@rohityadav1993 rohityadav1993 marked this pull request as ready for review October 3, 2023 07:50
@codecov-commenter
Copy link

codecov-commenter commented Oct 5, 2023

Codecov Report

Merging #11584 (8951cc9) into master (d1021df) will increase coverage by 0.03%.
Report is 1 commits behind head on master.
The diff coverage is 54.94%.

@@             Coverage Diff              @@
##             master   #11584      +/-   ##
============================================
+ Coverage     63.09%   63.13%   +0.03%     
  Complexity     1117     1117              
============================================
  Files          2342     2344       +2     
  Lines        125900   125986      +86     
  Branches      19362    19375      +13     
============================================
+ Hits          79437    79536      +99     
+ Misses        40817    40790      -27     
- Partials       5646     5660      +14     
Flag Coverage Δ
integration <0.01% <0.00%> (ø)
integration1 <0.01% <0.00%> (ø)
integration2 0.00% <0.00%> (ø)
java-11 63.06% <54.94%> (+12.92%) ⬆️
java-17 62.97% <54.94%> (+0.01%) ⬆️
java-20 62.96% <54.94%> (+0.02%) ⬆️
temurin 63.13% <54.94%> (+0.03%) ⬆️
unittests 63.12% <54.94%> (+0.03%) ⬆️
unittests1 67.25% <54.94%> (+<0.01%) ⬆️
unittests2 14.43% <0.00%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...psert/ConcurrentMapTableUpsertMetadataManager.java 38.46% <100.00%> (+5.12%) ⬆️
...rg/apache/pinot/spi/config/table/UpsertConfig.java 82.69% <33.33%> (-3.03%) ⬇️
...t/local/upsert/BaseTableUpsertMetadataManager.java 73.14% <41.66%> (-3.39%) ⬇️
...not/segment/local/upsert/PartialUpsertHandler.java 78.12% <73.33%> (-5.66%) ⬇️
.../merger/PartialUpsertRowMergeEvaluatorFactory.java 0.00% <0.00%> (ø)
...e/pinot/segment/local/segment/readers/LazyRow.java 60.00% <60.00%> (ø)

... and 15 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a clean abstraction. We should not mix value based merger and row based merger in the same class because it will be very hard to maintain, and the sequence of applying the mergers can be very confusing.

PartialUpsertHandler already provides the row level method void merge(IndexSegment indexSegment, int docId, GenericRow newRecord), but PartialUpsertMerger has limitation that it can only work on a single value.
So we have 2 options:

  1. Make PartialUpsertHandler pluggable and make the current one the default implementation
  2. Modify PartialUpsertMerger to take row (something like Object merge(IndexSegment indexSegment, int docId, GenericRow newRecord)) and make it pluggable. Since each column can reference the value from other columns, the contract should be to not change the column value before all the mergers are applied

I'm leaning towards the second approach because that allows us to reuse the existing mergers.

* There isn't any advantage to have a LazyRow wrap a GenericRow but has been kept for syntactic sugar.
*/
public class LazyRow {
private IndexSegment _segment;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(code format) Please follow Pinot Style

@rohityadav1993
Copy link
Contributor Author

rohityadav1993 commented Oct 9, 2023

Thanks for the feedback @Jackie-Jiang

We should not mix value based merger and row based merger in the same class because it will be very hard to maintain, and the sequence of applying the mergers can be very confusing.

I agree with the maintenance perspective. There could be a requirement to have both row merger and column level merger for values which were not computed in row merger. Hence, there was the motivation to keep them in the same class.
In the current implementation, PUH is responsible for computing row merger(if specified) followed by column level merger for columns not part of row merger result.

I'm leaning towards the second approach because that allows us to reuse the existing mergers.

This is a cleaner approach if we can stick to a good contract for merge. from the design doc Approach 2, if we can define merge as merge(GenericRow previousRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) and pass an instance of LazyRow extends GenericRow

Since each column can reference the value from other columns, the contract should be to not change the column value before all the mergers are applied

reuseMergerResult will be used to store the intermediate merger results and avoid modification to newRecord until all mergers are applied. We can ensure that reuseMergerResult is applied to newRecord after column-level mergers are executed.

If we don't want to have Map<String, Object> reuseMergerResult in the merge method then we have to initialise PartialUpsertHandler with reuseMergerResult for each PartitionUpsertMetadataManager instead of TableUpsertMetadataManager to avoid concurrent modification.

@Jackie-Jiang
Copy link
Contributor

IIUC, LazyRow is introduced to avoid reading the same value from the segment multiple times. I think it is a good abstraction (suggestion making it a wrapper over IndexSegment and allow setting a docId so that the internal fieldToValueMap can be reused, no need to allow initializing it with GenericRow), and we can make PartialUpsertMerger merge API: void merge(LazyRow previousRow, GenericRow currentRow, Map<String, Object> mergedValues). This way we can even support generating multiple values in one merger

@rohityadav1993
Copy link
Contributor Author

rohityadav1993 commented Oct 10, 2023

IIUC, LazyRow is introduced to avoid reading the same value from the segment multiple times. I think it is a good abstraction (suggestion making it a wrapper over IndexSegment and allow setting a docId so that the internal fieldToValueMap can be reused, no need to allow initializing it with GenericRow), and we can make PartialUpsertMerger merge API: void merge(LazyRow previousRow, GenericRow currentRow, Map<String, Object> mergedValues). This way we can even support generating multiple values in one merger

It is also used for defining new interface:

public interface PartialUpsertRowMergeEvaluator {
    void evaluate(LazyRow previousRow, LazyRow newRow, Map<String, Object> result);
}

which represents the row merge logic, which requires LazyRow to wrap GenericRow.
So this contract should change as well similar to void merge(LazyRow previousRow, GenericRow currentRow, Map<String, Object> mergedValues)

@Jackie-Jiang
Copy link
Contributor

My suggestion is to not add a new interface, but modify the existing PartialUpsertMerger interface

@rohityadav1993 rohityadav1993 changed the title custom partial upsert row merger [DRAFT] custom partial upsert row merger Oct 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Configuration Config changes (addition/deletion/change in behavior) feature release-notes Referenced by PRs that need attention when compiling the next release notes upsert
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants