-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#13049 Refactored RecordTransformer & merged RecordEnricher #13086
Changes from all commits
d066872
ee42776
d4f3cd6
b4eb21b
0e02293
12e9a87
f4424da
e261dae
4a18a04
b328fec
cfdf2c9
4202736
437c840
7c7f3cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,7 @@ | |
import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable; | ||
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter; | ||
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; | ||
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; | ||
import org.apache.pinot.segment.local.segment.creator.TransformPipeline; | ||
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; | ||
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; | ||
|
@@ -76,7 +77,6 @@ | |
import org.apache.pinot.spi.data.readers.GenericRow; | ||
import org.apache.pinot.spi.metrics.PinotMeter; | ||
import org.apache.pinot.spi.plugin.PluginManager; | ||
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; | ||
import org.apache.pinot.spi.stream.ConsumerPartitionState; | ||
import org.apache.pinot.spi.stream.LongMsgOffset; | ||
import org.apache.pinot.spi.stream.MessageBatch; | ||
|
@@ -275,7 +275,7 @@ public void deleteSegmentFile() { | |
private final int _partitionGroupId; | ||
private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus; | ||
final String _clientId; | ||
private final RecordEnricherPipeline _recordEnricherPipeline; | ||
private final RecordTransformer _recordEnricherPipeline; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, we should be able to integrate it into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. I will take a look into TransformPipeline. Didn't know this class existed. Will modify according to it. |
||
private final TransformPipeline _transformPipeline; | ||
private PartitionGroupConsumer _partitionGroupConsumer = null; | ||
private StreamMetadataProvider _partitionMetadataProvider = null; | ||
|
@@ -1514,10 +1514,10 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf | |
} | ||
|
||
try { | ||
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig); | ||
_recordEnricherPipeline = RecordTransformer.fromTableConfig(tableConfig); | ||
} catch (Exception e) { | ||
_realtimeTableDataManager.addSegmentError(_segmentNameStr, | ||
new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e)); | ||
new SegmentErrorInfo(now(), "Failed to initialize the Record Transformer pipeline", e)); | ||
throw e; | ||
} | ||
_transformPipeline = new TransformPipeline(tableConfig, schema); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,45 +16,29 @@ | |
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pinot.spi.recordenricher; | ||
package org.apache.pinot.plugin.record.enricher; | ||
|
||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.ServiceLoader; | ||
import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; | ||
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
||
public class RecordEnricherRegistry { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should be able to remove this registry and follow the existing way of creating |
||
private static final Logger LOGGER = LoggerFactory.getLogger(RecordEnricherRegistry.class); | ||
private static final Map<String, RecordEnricherFactory> RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); | ||
private static final Map<String, RecordTransformer> RECORD_ENRICHER_FACTORY_MAP = new HashMap<>(); | ||
|
||
private RecordEnricherRegistry() { | ||
} | ||
|
||
public static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig, | ||
RecordEnricherValidationConfig config) { | ||
if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { | ||
throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); | ||
} | ||
|
||
RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) | ||
.validateEnrichmentConfig(enrichmentConfig.getProperties(), config); | ||
} | ||
|
||
public static RecordEnricher createRecordEnricher(EnrichmentConfig enrichmentConfig) | ||
throws IOException { | ||
if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) { | ||
throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType()); | ||
} | ||
return RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType()) | ||
.createEnricher(enrichmentConfig.getProperties()); | ||
public static Map<String, RecordTransformer> getRecordEnricherFactoryMap() { | ||
return RECORD_ENRICHER_FACTORY_MAP; | ||
} | ||
|
||
static { | ||
for (RecordEnricherFactory recordEnricherFactory : ServiceLoader.load(RecordEnricherFactory.class)) { | ||
for (RecordTransformer recordEnricherFactory : ServiceLoader.load(RecordTransformer.class)) { | ||
LOGGER.info("Registered record enricher factory type: {}", recordEnricherFactory.getEnricherType()); | ||
RECORD_ENRICHER_FACTORY_MAP.put(recordEnricherFactory.getEnricherType(), recordEnricherFactory); | ||
} | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After changing
RecordEnricher
intoRecordTransformer
, we should be able to put theRecordEnricher
in front of the existingRecordTransformer
s as part of the defaultCompositeTransformer