Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record enricher #12243

Merged
merged 14 commits into from
Feb 27, 2024
Merged

Record enricher #12243

merged 14 commits into from
Feb 27, 2024

Conversation

saurabhd336
Copy link
Contributor

@saurabhd336 saurabhd336 commented Jan 9, 2024

Adds a new top level table config called "enrichmentConfigs" similar to transformConfigs. This interface allows record enrichment for both realtime and offline tables. Added an implementation for CLP encoding of columns and another for using standard and groovy transform expressions for enrichment.

Config

"ingestionConfig": {
       "enrichmentConfigs": [
      {
        "enricherType": "clpEnricher",
        "properties": {
           "fields": ["logLine", "anotherLogLine"]
      },
      
      // Transform function / groovy expression based enricher
      {
        "enricher": "generateColumn",
        "properties": {
          "fieldToFunctionMap": {
              "fieldToAdd1": "<transform / groovy function 1>",
               "fieldToAdd2": "<transform / groovy function 2>"
          }
        }
      }
    ],
    ...
}

@codecov-commenter
Copy link

codecov-commenter commented Jan 9, 2024

Codecov Report

Attention: Patch coverage is 19.41176% with 137 lines in your changes are missing coverage. Please review.

Project coverage is 61.69%. Comparing base (38d86b0) to head (501b8f2).
Report is 48 commits behind head on master.

Files Patch % Lines
...lugin/record/enricher/clp/CLPEncodingEnricher.java 0.00% 37 Missing ⚠️
...cord/enricher/function/CustomFunctionEnricher.java 0.00% 17 Missing ⚠️
...not/spi/recordenricher/RecordEnricherPipeline.java 36.00% 14 Missing and 2 partials ⚠️
...not/spi/recordenricher/RecordEnricherRegistry.java 0.00% 16 Missing ⚠️
...richer/function/CustomFunctionEnricherFactory.java 0.00% 14 Missing ⚠️
...ecord/enricher/clp/CLPEncodingEnricherFactory.java 0.00% 8 Missing ⚠️
...t/spi/config/table/ingestion/EnrichmentConfig.java 0.00% 6 Missing ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 42.85% 4 Missing ⚠️
.../plugin/record/enricher/clp/ClpEnricherConfig.java 0.00% 4 Missing ⚠️
...nricher/function/CustomFunctionEnricherConfig.java 0.00% 4 Missing ⚠️
... and 6 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #12243      +/-   ##
============================================
- Coverage     61.74%   61.69%   -0.05%     
  Complexity      207      207              
============================================
  Files          2436     2446      +10     
  Lines        133108   133403     +295     
  Branches      20617    20652      +35     
============================================
+ Hits          82191    82307     +116     
- Misses        44876    45047     +171     
- Partials       6041     6049       +8     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 61.65% <19.41%> (-0.05%) ⬇️
java-21 61.57% <19.41%> (-0.05%) ⬇️
skip-bytebuffers-false 61.68% <19.41%> (-0.05%) ⬇️
skip-bytebuffers-true 61.56% <19.41%> (-0.05%) ⬇️
temurin 61.69% <19.41%> (-0.05%) ⬇️
unittests 61.69% <19.41%> (-0.05%) ⬇️
unittests1 46.83% <10.11%> (-0.06%) ⬇️
unittests2 27.70% <9.41%> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

import org.apache.pinot.spi.data.readers.GenericRow;


public abstract class RecordEnricher {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we put this in segment-spi? do we plan to have other type of enricher plugin in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we'd have support enricher as plugins, I don't see transformers plugin being supported. But yes we may have more enrichers in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for not supporting transformer plugins?

@saurabhd336 saurabhd336 force-pushed the recordEnricher branch 3 times, most recently from 97379ce to d933d6f Compare February 9, 2024 08:24
@saurabhd336 saurabhd336 marked this pull request as ready for review February 9, 2024 09:27
@saurabhd336 saurabhd336 changed the title Record enricher and CLP enricher Record enricher Feb 11, 2024
enricher.init(enrichmentConfig.getProperties());
pipeline.add(enricher);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Failed to instantiate record enricher" + enrichmentConfig.getEnricherClassName(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new RuntimeException("Failed to instantiate record enricher" + enrichmentConfig.getEnricherClassName(),
throw new RuntimeException("Failed to instantiate record enricher: " + enrichmentConfig.getEnricherClassName(),

I believe as is the class name will be concatenated with the word enricher which will make it hard to read these error messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Comment on lines 40 to 41
private EncodedMessage _clpEncodedMessage;
private MessageEncoder _clpMessageEncoder;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both of these can be made final, which will help enforce that they should only be assigned an instance once each (when the ctor is called).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Comment on lines 46 to 50
if (StringUtils.isEmpty(concatenatedFieldNames)) {
throw new IllegalArgumentException("Missing required property: " + FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
} else {
_fields = List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's usually more readable to have the success path be the first branch and the failure path be the else branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

}
}
} catch (Exception e) {
LOGGER.error("Failed to enrich record: {}", record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One subtle consequence of this pattern we use of modifying the input GenericRow is that if we have a fault during the modification then we're left with a partially updated GenericRow. Hopefully, we always drop the write when a pipeline fails otherwise, we could get very difficult to debug 'broken' data written to a segment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obviously, outside the scope of this PR, but just occurred to me and wanted to write it down to get feedback.

Comment on lines 81 to 94
if (!(value instanceof String)) {
LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'",
value.getClass().getSimpleName(), key, value);
} else {
String valueAsString = (String) value;
try {
_clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
logtype = _clpEncodedMessage.getLogTypeAsString();
encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
} catch (IOException e) {
LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString,
e.getMessage());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be more readable with the success path as the first branch:

Suggested change
if (!(value instanceof String)) {
LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'",
value.getClass().getSimpleName(), key, value);
} else {
String valueAsString = (String) value;
try {
_clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
logtype = _clpEncodedMessage.getLogTypeAsString();
encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
} catch (IOException e) {
LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString,
e.getMessage());
}
if (value instanceof String) {
try {
_clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
logtype = _clpEncodedMessage.getLogTypeAsString();
encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
} catch (IOException e) {
LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString,
e.getMessage());
}
} else {
LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'",
value.getClass().getSimpleName(), key, value);
String valueAsString = (String) value;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Comment on lines 53 to 55
_fieldToFunctionEvaluator.forEach((field, evaluator) -> {
record.putValue(field, evaluator.evaluate(record));
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is is possible for an exception to get thrown by any of the evaluators? If so, should we catch here (as was done in CLP) and log a message before propagating up?

Comment on lines +162 to +163
init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()),
new TransformPipeline(config.getTableConfig(), config.getSchema()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the RecordEnricherPipeline evaluate before the TransformPipeline or after? If it's after then we should have it occur after in the ctor parameter list, so that the APIs help re-enforce the logical behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Record enricher would evalute before the transform pipeline

((RecordReaderSegmentCreationDataSource) dataSource).setTransformPipeline(transformPipeline);
((RecordReaderSegmentCreationDataSource) dataSource).setRecordEnricherPipeline(enricherPipeline);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, if RecordEnricherPipeline is executed before TransformPipeline then lets have it consistently occur before TransformPipeline.


@Override
public void init(Map<String, String> enricherProperties) {
String concatenatedFieldNames = enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if one of the fields contains an illegal character (e.g., fo"o,ba$r)? Are there illegal characters for field names? Or we have a field list like: foo,,,bar,,?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a more natural config class as per @gortiz 's suggestion

private final String _enricherClassName;

@JsonPropertyDescription("Enricher properties")
private final Map<String, String> _properties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are some examples of properties we might have? The only one I saw in the code was fieldsForClpEncoding for CLPEncodingEnricher. Are there others?

One concern I have is how hard it will be to determine what valid properties are and what required properties are from the Config Schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a concrete config class per enricher type

@gortiz
Copy link
Contributor

gortiz commented Feb 14, 2024

@gortiz
Copy link
Contributor

gortiz commented Feb 14, 2024

Could we use a different configuration model?

We have historically abuse of properties fields. They are easy to implement (just map them to Map<String, String> with Jackson!), but they are a very poor interface. They are very difficult to document and verify and they limit the values we can use as strings.

One of the nicer things in index-spi is that we created a way to escape from the properties field limitation. It would be nice to continue going that way in new features like this one.

My recommendation is, instead of:

    "enrichmentConfigs": [
      {
        "enricherClassName": "org.apache.pinot.segment.local.recordenricher.clp.CLPEncodingEnricher",
        "properties": {
          "fieldsForClpEncoding": "logLine" <--- original record field
        }
      },
      {
        "enricherClassName": "org.apache.pinot.segment.local.recordenricher.function.CustomFunctionEnricher",
        "properties": {
          "newColumnToAddIntoRecord": "<groovy / regular transform function>" <--- original record field
        }
      }
    ],

Configure enrichers as:

    "enrichmentConfigs": [
      {
        "enricher": "CLP", // using a id instead of a class is easier for users and not so difficult for us
        "config": {
           "fields": ["logLine"] // I'm assuming several fields may be enriched
        }
      },
      {
        "enricher": "generatedColumn", // this is the name used in Postgres and MySQL for the same concept.
        "config": {
          "column": "columnName",
          "language": "groovy",
          "expression": "<groovy / regular transform function>"
        }
      }
    ],

@saurabhd336
Copy link
Contributor Author

@gortiz @ege-st Thanks for review. Updated the PR as per the suggestion to move to a more concrete config specification. Please take a look again.

@gortiz
Copy link
Contributor

gortiz commented Feb 15, 2024

Just as a reminder, we need to document this feature!

Copy link
Contributor

@gortiz gortiz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from the comments I already added, LGTM

@saurabhd336 saurabhd336 requested review from gortiz and ege-st February 26, 2024 11:51
@gortiz
Copy link
Contributor

gortiz commented Feb 26, 2024

Another small detail: can we standarise the name of the columns to add? CLP is using fields while groovy uses columns

@saurabhd336 saurabhd336 merged commit 07daa7b into apache:master Feb 27, 2024
19 checks passed
@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes ingestion labels Feb 27, 2024
@Jackie-Jiang
Copy link
Contributor

@snleee @swaminathanmanish Did you get a chance to review this?

@saurabhd336 Sorry for the late comments. What is the fundamental difference between record transformer and enricher? Currently we put enricher before transformer, which means the data type might not align with schema when the enricher is applied. Will this cause problems?

@saurabhd336
Copy link
Contributor Author

@Jackie-Jiang The two enrichers that have been added

  1. CLP enricher: Can generate and add the 3 clp specific columns for a given string field in the record. The way this differs from transforms, is that transform functions only allow computing one column value at a time. We'll end up having to run clp transformation 3 times for each individual column. This just makes it simpler. Right now CLP encoding only works for json formatted, stream messages based realtime tables. This makes it possible to use CLP with offline tables / protobuf formatted stream messages etc. (https://docs.pinot.apache.org/basics/data-import/clp)

  2. generateColumn: Enrich using a an existing transform function / groovy script. This is similar to transformations in many ways, but a usecase that couldn't be solved with existing transforms is when we want to generate an array of records using a groovy transform, and then unnest that array to explode into multiple rows. Right now, unnesting of an array field precedes record transformation. Without this enricher, users will have to use an external system to generate the array field and enrich the record before ingesting into pinot.

Other enrichers we may add in the future

  1. Enrich using one or more dimension table lookups for realtime tables. Again this can potentially be achieved using transformations, but we'll need a transform config each for all the columns being enriched.

@saurabhd336
Copy link
Contributor Author

saurabhd336 commented Feb 28, 2024

which means the data type might not align with schema when the enricher is applied. Will this cause problems?

Sorry I don't follow could you elaborate? Post enrichment, the new field being added is as good as having preexisted in the GenericRecord. Even if there is a datatype mismatch, DataTypeTransformer which runs as part of the default transform pipeline can handle the transformation to the right datatype? And if that conversion fails, the user had used a wrong enricher to begin with?

@gortiz
Copy link
Contributor

gortiz commented Feb 28, 2024

From my point of view enrichers and transformers are mostly the same, but enrichers expressiveness are, pun intended, richer than transformers, which are quite limited in the way they can be configured.

@Jackie-Jiang
Copy link
Contributor

The interface for RecordEnricher is almost identical to RecordTransformer, so I believe we should be able to implement the enricher as a special transformer.
I do see the limitation of not able to expend one record into multiple with RecordTransformer because the logic is handled before invoking record transformation, but we should be able to handle that by improving TransformPipeline to check GenericRow.MULTIPLE_RECORDS_KEY for the result of each transformation.
My concern of introducing enricher as a separate module is that user/develop might get confused of when to use enricher vs transformer as they are essentially doing the same thing. It would be great if we can unify them into the same module.

As for the data type, don't worry about that as ExpressionTransformer is also applied before DataTypeTransformer.

@gortiz
Copy link
Contributor

gortiz commented Feb 29, 2024

Do you suggest to rollback the change and modify TransformPipeline to support the same semantics defined in ExpressionTransformer? We are still in time to do that.

@Jackie-Jiang
Copy link
Contributor

Yes. I prefer keeping one single interface for enrichment/transformation on records, unless there are fundamental differences between them. @saurabhd336 what do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature ingestion release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants