Skip to content

Commit

Permalink
Enable complexType handling in SegmentProcessFramework (#12942)
Browse files Browse the repository at this point in the history
* Enable complexType handling in SegmentProcessFramework
  • Loading branch information
swaminathanmanish authored Apr 18, 2024
1 parent 02b1e3d commit e057129
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
Expand All @@ -43,6 +45,8 @@
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
Expand All @@ -52,6 +56,7 @@
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -73,6 +78,8 @@ public class SegmentProcessorFrameworkTest {
private List<RecordReader> _singleSegment;
private List<RecordReader> _multipleSegments;
private List<RecordReader> _multiValueSegments;
private List<RecordReader> _recordReaderWithComplexType;


private TableConfig _tableConfig;
private TableConfig _tableConfigNullValueEnabled;
Expand Down Expand Up @@ -113,6 +120,8 @@ public void setup()

_schema =
new Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("campaign", DataType.STRING, "")
.addSingleValueDimension("campaign.inner1", DataType.STRING)
.addSingleValueDimension("campaign.inner1.inner2", DataType.STRING)
// NOTE: Intentionally put 1000 as default value to test skipping null values during rollup
.addMetric("clicks", DataType.INT, 1000)
.addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
Expand All @@ -127,6 +136,7 @@ public void setup()
_multipleSegments = createInputSegments(new File(TEMP_DIR, "multiple_segments"), _rawData, 3, _schema);
_multiValueSegments =
createInputSegments(new File(TEMP_DIR, "multi_value_segment"), _rawDataMultiValue, 1, _schemaMV);
_recordReaderWithComplexType = createRecordReaderWithComplexType();
}

private List<RecordReader> createInputSegments(File inputDir, List<Object[]> rawData, int numSegments, Schema schema)
Expand Down Expand Up @@ -168,6 +178,22 @@ private List<RecordReader> createInputSegments(File inputDir, List<Object[]> raw
return segmentRecordReaders;
}

private List<RecordReader> createRecordReaderWithComplexType() {
GenericRow genericRow = new GenericRow();
genericRow.putValue("a", 1L);
Map<String, Object> map1 = new HashMap<>();
genericRow.putValue("campaign", map1);
map1.put("inner", "innerv");
Map<String, Object> innerMap1 = new HashMap<>();
innerMap1.put("inner2", "inner2v");

map1.put("inner1", innerMap1);
Map<String, Object> map2 = new HashMap<>();
map2.put("c", 3);
genericRow.putValue("map2", map2);
return List.of(new GenericRowRecordReader(List.of(genericRow)));
}

private GenericRow getGenericRow(Object[] rawRow) {
GenericRow row = new GenericRow();
row.putValue("campaign", rawRow[0]);
Expand Down Expand Up @@ -222,6 +248,28 @@ public void testRecordReaderFileConfigInit() throws Exception {
assertEquals(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig(), true);
}

@Test
public void testSegmentGenerationWithComplexType() throws Exception {
File workingDir = new File(TEMP_DIR, "single_segment_complex_type_output");
FileUtils.forceMkdir(workingDir);
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setComplexTypeConfig(
new ComplexTypeConfig(null, ".", null, null));
_tableConfig.setIngestionConfig(ingestionConfig);
// Default configs
SegmentProcessorConfig config =
new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
SegmentProcessorFramework framework =
new SegmentProcessorFramework(_recordReaderWithComplexType, config, workingDir);
List<File> outputSegments = framework.process();
ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
// Pick the column created from complex type
ColumnMetadata campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign.inner1.inner2");
// Verify we see a specific value parsed from the complexType
Assert.assertEquals(campaignMetadata.getMinValue().compareTo("inner2v"), 0);
}

@Test
public void testSingleSegment()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public static CompositeTransformer getDefaultTransformer(TableConfig tableConfig
public static CompositeTransformer composeAllTransformers(List<RecordTransformer> customTransformers,
TableConfig tableConfig, Schema schema) {
List<RecordTransformer> allTransformers = new ArrayList<>(customTransformers);
ComplexTypeTransformer complexTypeTransformer = ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);
if (complexTypeTransformer != null) {
allTransformers.add(complexTypeTransformer);
}
allTransformers.addAll(getDefaultTransformers(tableConfig, schema));
return new CompositeTransformer(allTransformers);
}
Expand Down

0 comments on commit e057129

Please sign in to comment.