-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pluggable partial upsert merger #11983
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #11983 +/- ##
============================================
+ Coverage 61.75% 62.08% +0.33%
+ Complexity 207 198 -9
============================================
Files 2436 2465 +29
Lines 133233 134846 +1613
Branches 20636 20834 +198
============================================
+ Hits 82274 83720 +1446
- Misses 44911 44954 +43
- Partials 6048 6172 +124
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Please add some description to the PR, including what support is added, how to configure it etc. |
3e4f005
to
7ec1c8a
Compare
@Jackie-Jiang, please review, added the necessary details. |
@@ -106,8 +106,8 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD | |||
Preconditions.checkArgument(partialUpsertStrategies != null, | |||
"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); | |||
_partialUpsertHandler = | |||
new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(), | |||
_comparisonColumns); | |||
new PartialUpsertHandler(schema, tableConfig.getValidationConfig().getTimeColumnName(), _comparisonColumns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get _comparisonColumns from upsertConfig?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not pass in time column. We can pass in _comparisonColumns
and upsertConfig
...-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
Outdated
Show resolved
Hide resolved
@@ -106,8 +106,8 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD | |||
Preconditions.checkArgument(partialUpsertStrategies != null, | |||
"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); | |||
_partialUpsertHandler = | |||
new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(), | |||
_comparisonColumns); | |||
new PartialUpsertHandler(schema, tableConfig.getValidationConfig().getTimeColumnName(), _comparisonColumns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not pass in time column. We can pass in _comparisonColumns
and upsertConfig
@@ -307,7 +309,7 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) { | |||
int currentDocId = recordLocation.getDocId(); | |||
if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) { | |||
_reusePreviousRow.init(currentSegment, currentDocId); | |||
_partialUpsertHandler.merge(_reusePreviousRow, record); | |||
_partialUpsertHandler.merge(_reusePreviousRow, record, _reuseMergerResult); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this reusable map should be maintained within the PartialUpsertHandler
. I don't see why it should be part of the interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The_reuseMergerResult
map is defined in the context of a partition for a consuming segment. If we move it to PartialUpsertHandler
then we need to modify PartialUpsertHandler
to be initialised in BasePartitionUpsertMetadataManager
instead of BaseTableUpsertMetadataManager
. Moreover the _reuseMergerResult
would have to be made threadsafe across consuming segments.
...-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
Outdated
Show resolved
Hide resolved
7ec1c8a
to
a8d7723
Compare
The integration test failure is in delete table flow which is unrelated to the proposed changes. |
...nt-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java
Outdated
Show resolved
Hide resolved
// PartialUpsertColumnMerger already handles default merger but for any custom implementations | ||
// non merged columns need to be applied with default merger | ||
newRecord.putValue(column, | ||
_defaultPartialUpsertMerger.merge(prevRecord.getValue(column), newRecord.getValue(column))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default strategy is columnar based. Do we want to apply it for other mergers? I feel it is more intuitive if all the merge logic is handled within the merger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do not handle default merge here, we may iterate over the merger result to reduce map lookups
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, this was done with intention to keep the logic in the merger class minimal. But the optmization on map iteration is a better option. Let me update this.
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
Outdated
Show resolved
Hide resolved
@@ -918,32 +918,43 @@ static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema sche | |||
UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); | |||
assert upsertConfig != null; | |||
Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); | |||
String partialUpsertMergerClass = upsertConfig.getPartialUpsertMergerClass(); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Jackie-Jiang , @deemoliu , we may want to disallow defualtMergerStrategy to be defined when using partialUpsertMergerClass. This will require us to modify UpsertConfig.class default initialization: private Strategy _defaultPartialUpsertStrategy = Strategy.OVERWRITE;
And handle this initilization in POST /tableConfigs
c6bca7c
to
6b6a184
Compare
@Jackie-Jiang @deemoliu , please review the updated changes. |
6b6a184
to
519108f
Compare
519108f
to
db12052
Compare
"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); | ||
partialUpsertHandler = | ||
new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(), | ||
comparisonColumns); | ||
new PartialUpsertHandler(schema, comparisonColumns, upsertConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(format) Please auto reformat all the changes with Pinot Style
"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); | ||
partialUpsertHandler = | ||
new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(), | ||
comparisonColumns); | ||
new PartialUpsertHandler(schema, comparisonColumns, upsertConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(format) Please auto-reformat all the changes with Pinot Style
String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass(); | ||
Preconditions.checkArgument( | ||
StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) These checks can be pushed down to the constructor of PartialUpsertHandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the checks are moved down to PartialUpsertHandler
, the create table API response is success but the table goes in error state. This check would pro-actively fail table creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've applied a commit to demonstrate the idea. The exception will be thrown from the constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thank you Jackie.
} | ||
} | ||
public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) { | ||
reuseMergerResult.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Clear the result on the caller side
// merger current row with previously indexed row | ||
_partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult); | ||
|
||
if (_partialUpsertMerger instanceof PartialUpsertColumnarMerger) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to differentiate PartialUpsertColumnarMerger
and custom merger? The logic should be the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to keep the logic for PartialUpsertColumnarMerger
(behaviour so far) to be unmodified and was doing null handling wrongly for custom merger. I have removed the differentiation and using putDefaultNullValue()
for null handling based on another review comment. We will never have a null merger result from PartialUpsertColumnarMerger
.
} else { | ||
// if column exists but mapped to a null value then merger result was a null value | ||
newRecord.addNullValueField(column); | ||
newRecord.putValue(column, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(MAJOR) This won't work. In order to set a value to null, you'll need to call putDefaultNullValue()
with a default null value. Do you see a scenario where you want to explicitly set a value to null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might not be fully aware of the null value support feature, let me update it with putDefaultNullValue()
. Thank you. I don't forsee a scenario for null value.
} catch (ClassNotFoundException | ||
| NoSuchMethodException | InstantiationException | IllegalAccessException | ||
| InvocationTargetException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) These can be merged as Exception e
if (StringUtils.isNotBlank(customRowMerger)) { | ||
try { | ||
Class<?> partialUpsertMergerClass = Class.forName(customRowMerger); | ||
if (!BasePartialUpsertMerger.class.isAssignableFrom(partialUpsertMergerClass)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it have to extend BasePartialUpsertMerger
? Implementing PartialUpsertMerger
should be good right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BasePartialUpsertMerger
defines a constructor which is be used to initialize a custom merger class by overriding the constructor:
(PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class) .newInstance(primaryKeyColumns, comparisonColumns, upsertConfig);
79dc192
to
917c9d2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Applied a commit for some minor cleanups
feature
refactor
release-notes
Implements: #11174
This PR introduces the possibility to define row mergers(currently only per column merging happens) for partial upsert. This allows a table to define complex logic for merging in partial upsert table. An analogy is painless script in ES.
This is the second PR for the feature described in this design doc.
Makes
PartialUpsertHandler
do row merging usingPartialUpsertMerger
.Based on previous feedback comment.
PartialUpsertMerger
as:public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> mergerResult)
PartialUpsertMerger
PartialUpsertHandler
initiliazes an implementation ofPartialUpsertMerger
usingPartialUpsertMergerFactory
.Adds the possibility to define custom merging per table and specify as table config.
PartialUpsertColumnarMerger
(this is existing columnar merging logic extracted out ofPartialUpsertHandler
) if column level merger strategies are defined.PartialUpsertMergerFactory
provides the implementation for same. e.g._Note: When custom partialUpsertMergerClasscu is provided then the column merger strategies will not be applied. The
defaultPartialUpsertStrategy
is also not applicable for any column and should be handled in the custom merge logic.Refactors existing column mergers as implementations of
PartialUpsertColumnMerger
.Based on previous feedback comment.
PartialUpsertColumnMerger
to differentiate from row merger:PartialUpsertMerger
Changes based on the last review:
partialUpsertMergerClass
PartialUpsertHandler
to iterate over merger result when partialUpsertMergerClass is provided.Tests:
Unit test: Added
Functional test:
Created a test setup by replicating the quick start example: upsertPartialMeetupRsvp i.e. the partial upsert strategies were implemented as a custom partialUpsertMergerClass:
Table config:
upsertPartialCustomMergerMeetupRsvp_realtime_table_config.json
Observation:
Both partial upsert table with columnar strategies and custom merger class had same data:
upsertPartialMeetupRsvp_quer.txt
upsertPartialCustomMergerMeetupRsvp_query.txt