Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#13049 Refactored RecordTransformer & merged RecordEnricher #13086

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,11 @@

public class TableConfigUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TableConfigUtils.class);
private static final String FIELD_MISSING_MESSAGE_TEMPLATE = "Mandatory field '%s' is missing";

private TableConfigUtils() {
}

private static final String FIELD_MISSING_MESSAGE_TEMPLATE = "Mandatory field '%s' is missing";

public static TableConfig fromZNRecord(ZNRecord znRecord)
throws IOException {
Map<String, String> simpleFields = znRecord.getSimpleFields();
Expand All @@ -80,8 +79,8 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
Preconditions.checkState(tableType != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.TABLE_TYPE_KEY);

String validationConfigString = simpleFields.get(TableConfig.VALIDATION_CONFIG_KEY);
Preconditions
.checkState(validationConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.VALIDATION_CONFIG_KEY);
Preconditions.checkState(validationConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE,
TableConfig.VALIDATION_CONFIG_KEY);
SegmentsValidationAndRetentionConfig validationConfig =
JsonUtils.stringToObject(validationConfigString, SegmentsValidationAndRetentionConfig.class);

Expand All @@ -90,8 +89,8 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
TenantConfig tenantConfig = JsonUtils.stringToObject(tenantConfigString, TenantConfig.class);

String indexingConfigString = simpleFields.get(TableConfig.INDEXING_CONFIG_KEY);
Preconditions
.checkState(indexingConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.INDEXING_CONFIG_KEY);
Preconditions.checkState(indexingConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE,
TableConfig.INDEXING_CONFIG_KEY);
IndexingConfig indexingConfig = JsonUtils.stringToObject(indexingConfigString, IndexingConfig.class);

String customConfigString = simpleFields.get(TableConfig.CUSTOM_CONFIG_KEY);
Expand Down Expand Up @@ -180,14 +179,16 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
String instancePartitionsMapString = simpleFields.get(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY);
if (instancePartitionsMapString != null) {
instancePartitionsMap = JsonUtils.stringToObject(instancePartitionsMapString,
new TypeReference<Map<InstancePartitionsType, String>>() { });
new TypeReference<Map<InstancePartitionsType, String>>() {
});
}

Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap = null;
String segmentAssignmentConfigMapString = simpleFields.get(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY);
if (segmentAssignmentConfigMapString != null) {
segmentAssignmentConfigMap = JsonUtils.stringToObject(segmentAssignmentConfigMapString,
new TypeReference<Map<String, SegmentAssignmentConfig>>() { });
new TypeReference<Map<String, SegmentAssignmentConfig>>() {
});
}

return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
Expand Down Expand Up @@ -228,8 +229,8 @@ public static ZNRecord toZNRecord(TableConfig tableConfig)
}
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
if (instanceAssignmentConfigMap != null) {
simpleFields
.put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY, JsonUtils.objectToString(instanceAssignmentConfigMap));
simpleFields.put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY,
JsonUtils.objectToString(instanceAssignmentConfigMap));
}
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
if (fieldConfigList != null) {
Expand Down Expand Up @@ -263,11 +264,10 @@ public static ZNRecord toZNRecord(TableConfig tableConfig)
simpleFields.put(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY,
JsonUtils.objectToString(tableConfig.getInstancePartitionsMap()));
}
Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap =
tableConfig.getSegmentAssignmentConfigMap();
Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap = tableConfig.getSegmentAssignmentConfigMap();
if (segmentAssignmentConfigMap != null) {
simpleFields
.put(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY, JsonUtils.objectToString(segmentAssignmentConfigMap));
simpleFields.put(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY,
JsonUtils.objectToString(segmentAssignmentConfigMap));
}

ZNRecord znRecord = new ZNRecord(tableConfig.getTableName());
Expand Down Expand Up @@ -443,8 +443,8 @@ public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig
*/
public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
return hasPreConfiguredInstancePartitions(tableConfig)
&& tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType);
return hasPreConfiguredInstancePartitions(tableConfig) && tableConfig.getInstancePartitionsMap()
.containsKey(instancePartitionsType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -78,7 +77,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
private String _outputDirURI;
private Schema _schema;
private Set<String> _fieldsToRead;
private RecordEnricherPipeline _recordEnricherPipeline;
private RecordTransformer _recordEnricherPipeline;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After changing RecordEnricher into RecordTransformer, we should be able to put the RecordEnricher in front of the existing RecordTransformers as part of the default CompositeTransformer

private RecordTransformer _recordTransformer;

private File _stagingDir;
Expand Down Expand Up @@ -139,7 +138,7 @@ public void init(TableConfig tableConfig, Schema schema, Map<String, String> bat

_schema = schema;
_fieldsToRead = _schema.getColumnNames();
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(_tableConfig);
_recordEnricherPipeline = RecordTransformer.fromTableConfig(_tableConfig);
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
_avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema);
_reusableRecord = new GenericData.Record(_avroSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
Expand All @@ -76,7 +77,6 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
Expand Down Expand Up @@ -275,7 +275,7 @@ public void deleteSegmentFile() {
private final int _partitionGroupId;
private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus;
final String _clientId;
private final RecordEnricherPipeline _recordEnricherPipeline;
private final RecordTransformer _recordEnricherPipeline;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we should be able to integrate it into the TransformPipeline

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I will take a look into TransformPipeline. Didn't know this class existed. Will modify according to it.

private final TransformPipeline _transformPipeline;
private PartitionGroupConsumer _partitionGroupConsumer = null;
private StreamMetadataProvider _partitionMetadataProvider = null;
Expand Down Expand Up @@ -1514,10 +1514,10 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
}

try {
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
_recordEnricherPipeline = RecordTransformer.fromTableConfig(tableConfig);
} catch (Exception e) {
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e));
new SegmentErrorInfo(now(), "Failed to initialize the Record Transformer pipeline", e));
throw e;
}
_transformPipeline = new TransformPipeline(tableConfig, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -290,7 +289,7 @@ private List<File> generateSegment(Map<String, GenericRowFileManager> partitionT
GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
RecordEnricherPipeline.getPassThroughPipeline(),
RecordTransformer.getPassThroughPipeline(),
TransformPipeline.getPassThroughPipeline());
driver.build();
outputSegmentDirs.add(driver.getOutputDirectory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -69,7 +68,7 @@ public class SegmentMapper {
private final List<FieldSpec> _fieldSpecs;
private final boolean _includeNullFields;
private final int _numSortFields;
private final RecordEnricherPipeline _recordEnricherPipeline;
private final RecordTransformer _recordEnricherPipeline;
private final CompositeTransformer _recordTransformer;
private final TimeHandler _timeHandler;
private final Partitioner[] _partitioners;
Expand All @@ -94,7 +93,7 @@ public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
_fieldSpecs = pair.getLeft();
_numSortFields = pair.getRight();
_includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled();
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
_recordEnricherPipeline = RecordTransformer.fromTableConfig(tableConfig);
_recordTransformer = CompositeTransformer.composeAllTransformers(_customRecordTransformers, tableConfig, schema);
_timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig);
List<PartitionerConfig> partitionerConfigs = processorConfig.getPartitionerConfigs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.spi.recordenricher;
package org.apache.pinot.plugin.record.enricher;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class RecordEnricherRegistry {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be able to remove this registry and follow the existing way of creating RecordTransformer. Same for other places where 2 pipelines exist

private static final Logger LOGGER = LoggerFactory.getLogger(RecordEnricherRegistry.class);
private static final Map<String, RecordEnricherFactory> RECORD_ENRICHER_FACTORY_MAP = new HashMap<>();
private static final Map<String, RecordTransformer> RECORD_ENRICHER_FACTORY_MAP = new HashMap<>();

private RecordEnricherRegistry() {
}

public static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig,
RecordEnricherValidationConfig config) {
if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) {
throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType());
}

RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType())
.validateEnrichmentConfig(enrichmentConfig.getProperties(), config);
}

public static RecordEnricher createRecordEnricher(EnrichmentConfig enrichmentConfig)
throws IOException {
if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) {
throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType());
}
return RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType())
.createEnricher(enrichmentConfig.getProperties());
public static Map<String, RecordTransformer> getRecordEnricherFactoryMap() {
return RECORD_ENRICHER_FACTORY_MAP;
}

static {
for (RecordEnricherFactory recordEnricherFactory : ServiceLoader.load(RecordEnricherFactory.class)) {
for (RecordTransformer recordEnricherFactory : ServiceLoader.load(RecordTransformer.class)) {
LOGGER.info("Registered record enricher factory type: {}", recordEnricherFactory.getEnricherType());
RECORD_ENRICHER_FACTORY_MAP.put(recordEnricherFactory.getEnricherType(), recordEnricherFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import com.yscope.clp.compressorfrontend.MessageEncoder;
import java.io.IOException;
import java.util.List;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.recordenricher.RecordEnricher;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
import org.slf4j.Logger;
Expand All @@ -39,11 +39,12 @@
* 2. 'x_dictVars' - The dictionary variables of the encoded message
* 3. 'x_encodedVars' - The encoded variables of the encoded message
*/
public class CLPEncodingEnricher implements RecordEnricher {
public class CLPEncodingEnricher implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
private final ClpEnricherConfig _config;
private final EncodedMessage _clpEncodedMessage;
private final MessageEncoder _clpMessageEncoder;
private static final String ENRICHER_TYPE = "clpEnricher";

public CLPEncodingEnricher(JsonNode enricherProperties) throws IOException {
_config = JsonUtils.jsonNodeToObject(enricherProperties, ClpEnricherConfig.class);
Expand All @@ -58,7 +59,7 @@ public List<String> getInputColumns() {
}

@Override
public void enrich(GenericRow record) {
public GenericRow transform(GenericRow record) {
try {
for (String field : _config.getFields()) {
Object value = record.getValue(field);
Expand All @@ -69,6 +70,7 @@ public void enrich(GenericRow record) {
} catch (Exception e) {
LOGGER.error("Failed to enrich record: {}", record);
}
return record;
}

private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) {
Expand Down Expand Up @@ -97,4 +99,24 @@ private void enrichWithClpEncodedFields(String key, Object value, GenericRow to)
to.putValue(key + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars);
to.putValue(key + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars);
}

@Override
public String getEnricherType() {
return ENRICHER_TYPE;
}

@Override
public RecordTransformer createEnricher(JsonNode enricherProps)
throws IOException {
return new CLPEncodingEnricher(enricherProps);
}

@Override
public void validateEnrichmentConfig(JsonNode enricherProps, boolean validationConfig) {
try {
ClpEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, ClpEnricherConfig.class);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to parse clp enricher config", e);
}
}
}

This file was deleted.

Loading
Loading