Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding record reader config/context param to record transformer #12520

Merged
merged 2 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,11 @@ private Map<String, GenericRowFileManager> doMap()
int totalNumRecordReaders = _recordReaderFileConfigs.size();
GenericRow reuse = new GenericRow();
for (RecordReaderFileConfig recordReaderFileConfig : _recordReaderFileConfigs) {
RecordReader recordReader = recordReaderFileConfig.getRecordReader();

// Mapper can terminate midway of reading a file if the intermediate file size has crossed the configured
// threshold. Map phase will continue in the next iteration right where we are leaving off in the current
// iteration.
boolean shouldMapperTerminate =
!completeMapAndTransformRow(recordReader, reuse, observer, count, totalNumRecordReaders);
!completeMapAndTransformRow(recordReaderFileConfig, reuse, observer, count, totalNumRecordReaders);

// Terminate the map phase if intermediate file size has crossed the threshold.
if (shouldMapperTerminate) {
Expand All @@ -164,9 +162,10 @@ private Map<String, GenericRowFileManager> doMap()

// Returns true if the map phase can continue, false if it should terminate based on the configured threshold for
// intermediate file size during map phase.
private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow reuse,
private boolean completeMapAndTransformRow(RecordReaderFileConfig recordReaderFileConfig, GenericRow reuse,
Consumer<Object> observer, int count, int totalCount) throws Exception {
observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount));
RecordReader recordReader = recordReaderFileConfig.getRecordReader();
while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
reuse = recordReader.next(reuse);
_recordEnricherPipeline.run(reuse);
Expand All @@ -176,13 +175,13 @@ private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow
if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
//noinspection unchecked
for (GenericRow row : (Collection<GenericRow>) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
GenericRow transformedRow = _recordTransformer.transform(row);
GenericRow transformedRow = _recordTransformer.transform(row, recordReaderFileConfig);
if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
writeRecord(transformedRow);
}
}
} else {
GenericRow transformedRow = _recordTransformer.transform(reuse);
GenericRow transformedRow = _recordTransformer.transform(reuse, recordReaderFileConfig);
if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
writeRecord(transformedRow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;

import org.apache.pinot.spi.data.readers.RecordReaderConfig;

/**
* The {@code CompositeTransformer} class performs multiple transforms based on the inner {@link RecordTransformer}s.
Expand Down Expand Up @@ -115,12 +115,19 @@ public CompositeTransformer(List<RecordTransformer> transformers) {

@Nullable
@Override
public GenericRow transform(GenericRow record) {
public GenericRow transform(GenericRow genericRow) {
return transform(genericRow, null);
}

@Nullable
@Override
public GenericRow transform(GenericRow record,
RecordReaderConfig recordReaderConfig) {
for (RecordTransformer transformer : _transformers) {
if (!IngestionUtils.shouldIngestRow(record)) {
return record;
}
record = transformer.transform(record);
record = transformer.transform(record, recordReaderConfig);
if (record == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.readers.GenericRow;

import org.apache.pinot.spi.data.readers.RecordReaderConfig;

/**
* The record transformer which takes a {@link GenericRow} and transform it based on some custom rules.
Expand All @@ -43,4 +43,14 @@ default boolean isNoOp() {
*/
@Nullable
GenericRow transform(GenericRow record);

/**
* Transforms a record based on some custom rules using record reader context.
* @param record Record to transform
* @return Transformed record, or {@code null} if the record does not follow certain rules.
*/
@Nullable
default GenericRow transform(GenericRow record, RecordReaderConfig recordReaderConfig) {
return transform(record);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* RecordReader can be initialized just when its about to be used, which avoids early/eager
* initialization/memory allocation.
*/
public class RecordReaderFileConfig {
public class RecordReaderFileConfig implements RecordReaderConfig {
public final FileFormat _fileFormat;
public final File _dataFile;
public final Set<String> _fieldsToRead;
Expand Down
Loading