-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Record enricher #12243
Record enricher #12243
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #12243 +/- ##
============================================
- Coverage 61.74% 61.69% -0.05%
Complexity 207 207
============================================
Files 2436 2446 +10
Lines 133108 133403 +295
Branches 20617 20652 +35
============================================
+ Hits 82191 82307 +116
- Misses 44876 45047 +171
- Partials 6041 6049 +8
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
import org.apache.pinot.spi.data.readers.GenericRow; | ||
|
||
|
||
public abstract class RecordEnricher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we put this in segment-spi? do we plan to have other type of enricher plugin in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we'd have support enricher as plugins, I don't see transformers plugin being supported. But yes we may have more enrichers in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason for not supporting transformer plugins?
97379ce
to
d933d6f
Compare
enricher.init(enrichmentConfig.getProperties()); | ||
pipeline.add(enricher); | ||
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { | ||
throw new RuntimeException("Failed to instantiate record enricher" + enrichmentConfig.getEnricherClassName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new RuntimeException("Failed to instantiate record enricher" + enrichmentConfig.getEnricherClassName(), | |
throw new RuntimeException("Failed to instantiate record enricher: " + enrichmentConfig.getEnricherClassName(), |
I believe as is the class name will be concatenated with the word enricher
which will make it hard to read these error messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
private EncodedMessage _clpEncodedMessage; | ||
private MessageEncoder _clpMessageEncoder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both of these can be made final
, which will help enforce that they should only be assigned an instance once each (when the ctor is called).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
if (StringUtils.isEmpty(concatenatedFieldNames)) { | ||
throw new IllegalArgumentException("Missing required property: " + FIELDS_FOR_CLP_ENCODING_CONFIG_KEY); | ||
} else { | ||
_fields = List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's usually more readable to have the success path be the first branch and the failure path be the else branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
} | ||
} | ||
} catch (Exception e) { | ||
LOGGER.error("Failed to enrich record: {}", record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One subtle consequence of this pattern we use of modifying the input GenericRow
is that if we have a fault during the modification then we're left with a partially updated GenericRow
. Hopefully, we always drop the write when a pipeline fails otherwise, we could get very difficult to debug 'broken' data written to a segment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Obviously, outside the scope of this PR, but just occurred to me and wanted to write it down to get feedback.
if (!(value instanceof String)) { | ||
LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'", | ||
value.getClass().getSimpleName(), key, value); | ||
} else { | ||
String valueAsString = (String) value; | ||
try { | ||
_clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage); | ||
logtype = _clpEncodedMessage.getLogTypeAsString(); | ||
encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs(); | ||
dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings(); | ||
} catch (IOException e) { | ||
LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString, | ||
e.getMessage()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be more readable with the success path as the first branch:
if (!(value instanceof String)) { | |
LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'", | |
value.getClass().getSimpleName(), key, value); | |
} else { | |
String valueAsString = (String) value; | |
try { | |
_clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage); | |
logtype = _clpEncodedMessage.getLogTypeAsString(); | |
encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs(); | |
dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings(); | |
} catch (IOException e) { | |
LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString, | |
e.getMessage()); | |
} | |
if (value instanceof String) { | |
try { | |
_clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage); | |
logtype = _clpEncodedMessage.getLogTypeAsString(); | |
encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs(); | |
dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings(); | |
} catch (IOException e) { | |
LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString, | |
e.getMessage()); | |
} | |
} else { | |
LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'", | |
value.getClass().getSimpleName(), key, value); | |
String valueAsString = (String) value; | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
_fieldToFunctionEvaluator.forEach((field, evaluator) -> { | ||
record.putValue(field, evaluator.evaluate(record)); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is is possible for an exception to get thrown by any of the evaluator
s? If so, should we catch here (as was done in CLP) and log a message before propagating up?
init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()), | ||
new TransformPipeline(config.getTableConfig(), config.getSchema())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the RecordEnricherPipeline
evaluate before the TransformPipeline
or after? If it's after then we should have it occur after in the ctor parameter list, so that the APIs help re-enforce the logical behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Record enricher would evalute before the transform pipeline
((RecordReaderSegmentCreationDataSource) dataSource).setTransformPipeline(transformPipeline); | ||
((RecordReaderSegmentCreationDataSource) dataSource).setRecordEnricherPipeline(enricherPipeline); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, if RecordEnricherPipeline
is executed before TransformPipeline
then lets have it consistently occur before TransformPipeline
.
|
||
@Override | ||
public void init(Map<String, String> enricherProperties) { | ||
String concatenatedFieldNames = enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if one of the fields contains an illegal character (e.g., fo"o,ba$r
)? Are there illegal characters for field names? Or we have a field list like: foo,,,bar,,
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a more natural config class as per @gortiz 's suggestion
private final String _enricherClassName; | ||
|
||
@JsonPropertyDescription("Enricher properties") | ||
private final Map<String, String> _properties; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are some examples of properties we might have? The only one I saw in the code was fieldsForClpEncoding
for CLPEncodingEnricher
. Are there others?
One concern I have is how hard it will be to determine what valid properties are and what required properties are from the Config Schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a concrete config class per enricher type
|
Could we use a different configuration model? We have historically abuse of One of the nicer things in My recommendation is, instead of: "enrichmentConfigs": [
{
"enricherClassName": "org.apache.pinot.segment.local.recordenricher.clp.CLPEncodingEnricher",
"properties": {
"fieldsForClpEncoding": "logLine" <--- original record field
}
},
{
"enricherClassName": "org.apache.pinot.segment.local.recordenricher.function.CustomFunctionEnricher",
"properties": {
"newColumnToAddIntoRecord": "<groovy / regular transform function>" <--- original record field
}
}
], Configure enrichers as: "enrichmentConfigs": [
{
"enricher": "CLP", // using a id instead of a class is easier for users and not so difficult for us
"config": {
"fields": ["logLine"] // I'm assuming several fields may be enriched
}
},
{
"enricher": "generatedColumn", // this is the name used in Postgres and MySQL for the same concept.
"config": {
"column": "columnName",
"language": "groovy",
"expression": "<groovy / regular transform function>"
}
}
], |
f90d074
to
2e3c5da
Compare
pinot-spi/src/main/java/org/apache/pinot/spi/annotations/RecordEnricherFactory.java
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactoryInterface.java
Outdated
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java
Show resolved
Hide resolved
Just as a reminder, we need to document this feature! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from the comments I already added, LGTM
Another small detail: can we standarise the name of the columns to add? CLP is using |
@snleee @swaminathanmanish Did you get a chance to review this? @saurabhd336 Sorry for the late comments. What is the fundamental difference between record transformer and enricher? Currently we put enricher before transformer, which means the data type might not align with schema when the enricher is applied. Will this cause problems? |
@Jackie-Jiang The two enrichers that have been added
Other enrichers we may add in the future
|
Sorry I don't follow could you elaborate? Post enrichment, the new field being added is as good as having preexisted in the GenericRecord. Even if there is a datatype mismatch, DataTypeTransformer which runs as part of the default transform pipeline can handle the transformation to the right datatype? And if that conversion fails, the user had used a wrong enricher to begin with? |
From my point of view enrichers and transformers are mostly the same, but enrichers expressiveness are, pun intended, richer than transformers, which are quite limited in the way they can be configured. |
The interface for As for the data type, don't worry about that as |
Do you suggest to rollback the change and modify |
Yes. I prefer keeping one single interface for enrichment/transformation on records, unless there are fundamental differences between them. @saurabhd336 what do you think? |
Adds a new top level table config called "enrichmentConfigs" similar to transformConfigs. This interface allows record enrichment for both realtime and offline tables. Added an implementation for CLP encoding of columns and another for using standard and groovy transform expressions for enrichment.
Config