-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DRAFT] custom partial upsert row merger #11584
Conversation
private int _docId; | ||
private GenericRow _row; | ||
|
||
private HashMap<String, Object> _fieldToValueMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
concurrentHashMap ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least in the current usage, I don't see a requirement to have a ConcurrentHashMap as the merger happens per row per segment in a single thread manner. Do you see any usage where we want to use LazyRow and there can be multiple threads referencing it?
I am taking reference from GenericRow which also using hashmap.
if (upsertConfig.getRowMergerCustomImplementation() != null) { | ||
// In case of custom merger, the PUH is dependent on LazyRow object to be reused. LazyRow is stateful and | ||
// cause concurrent modification issues, hence a new PUH is created per partition | ||
return new PartialUpsertHandler(_schema, upsertConfig, _comparisonColumns); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. currently the partial upsert handler are initialized in the upsertTableManager and shared by partitionUpsertManager. After lazyRow passed in the partialUpsertHandler it become stateful, and multiple partitions can access the partial upsert handler. this leads to concurrent modification issue. So we have to avoid sharing the partial upsert handler among partitions.
@Jackie-Jiang do you see any improvement on this approach?
...-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertRowMergeEvaluatorFactory.java
Outdated
Show resolved
Hide resolved
...java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertRowMergeEvaluatorFactory.java
Outdated
Show resolved
Hide resolved
eb129c0
to
6ae06a9
Compare
6ae06a9
to
8951cc9
Compare
Codecov Report
@@ Coverage Diff @@
## master #11584 +/- ##
============================================
+ Coverage 63.09% 63.13% +0.03%
Complexity 1117 1117
============================================
Files 2342 2344 +2
Lines 125900 125986 +86
Branches 19362 19375 +13
============================================
+ Hits 79437 79536 +99
+ Misses 40817 40790 -27
- Partials 5646 5660 +14
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 15 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a clean abstraction. We should not mix value based merger and row based merger in the same class because it will be very hard to maintain, and the sequence of applying the mergers can be very confusing.
PartialUpsertHandler
already provides the row level method void merge(IndexSegment indexSegment, int docId, GenericRow newRecord)
, but PartialUpsertMerger
has limitation that it can only work on a single value.
So we have 2 options:
- Make
PartialUpsertHandler
pluggable and make the current one the default implementation - Modify
PartialUpsertMerger
to take row (something likeObject merge(IndexSegment indexSegment, int docId, GenericRow newRecord)
) and make it pluggable. Since each column can reference the value from other columns, the contract should be to not change the column value before all the mergers are applied
I'm leaning towards the second approach because that allows us to reuse the existing mergers.
* There isn't any advantage to have a LazyRow wrap a GenericRow but has been kept for syntactic sugar. | ||
*/ | ||
public class LazyRow { | ||
private IndexSegment _segment; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(code format) Please follow Pinot Style
Thanks for the feedback @Jackie-Jiang
I agree with the maintenance perspective. There could be a requirement to have both row merger and column level merger for values which were not computed in row merger. Hence, there was the motivation to keep them in the same class.
This is a cleaner approach if we can stick to a good contract for merge. from the design doc Approach 2, if we can define merge as
If we don't want to have |
IIUC, |
It is also used for defining new interface:
which represents the row merge logic, which requires LazyRow to wrap GenericRow. |
My suggestion is to not add a new interface, but modify the existing |
feature
release-notes
Problem: The PR addresses the feature gap of conditional column merger in partial upsert. With this, a table can be configured using a groovy script/custom class implementation to merge previous and new row column values based on a user-specified logic between previous and new row columns values.
Solution:
TODO:
Design doc: https://docs.google.com/document/d/1bBTCYZFP2stvzc6xZUOEh-XweVgC9WfD7uiSPbKtaZY/edit