diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java new file mode 100644 index 000000000000..c94f1f723f9c --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class CLPEncodingRealtimeIntegrationTest extends BaseClusterIntegrationTestSet { + private List _avroFiles; + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + _avroFiles = unpackAvroData(_tempDir); + + // Start the Pinot cluster + startZk(); + // Start a customized controller with more frequent realtime segment validation + startController(); + startBroker(); + startServers(1); + + startKafka(); + pushAvroIntoKafka(_avroFiles); + + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0)); + addTableConfig(tableConfig); + + waitForAllDocsLoaded(600_000L); + } + + @Nullable + @Override + protected List getInvertedIndexColumns() { + return null; + } + + @Nullable + @Override + protected List getRangeIndexColumns() { + return null; + } + + @Nullable + @Override + protected List getBloomFilterColumns() { + return null; + } + + @Nullable + @Override + protected String getSortedColumn() { + return null; + } + + @Override + protected List getNoDictionaryColumns() { + return Collections.singletonList("logLine"); + } + + @Test + public void testValues() + throws Exception { + assertEquals(getCurrentCountStarResult(), getCountStarResult()); + } + + protected int getRealtimeSegmentFlushSize() { + return 30; + } + + @Override + protected long getCountStarResult() { + return 100; + } + + @Override + protected String getTableName() { + return "clpEncodingIT"; + } + + @Override + protected String getAvroTarFileName() { + return "clpEncodingITData.tar.gz"; + } + + @Override + protected String getSchemaFileName() { + return "clpEncodingRealtimeIntegrationTestSchema.schema"; + } + + @Override + protected String getTimeColumnName() { + return "timestampInEpoch"; + } + + @Override + protected List getFieldConfigs() { + List fieldConfigs = new ArrayList<>(); + fieldConfigs.add( + new FieldConfig("logLine", FieldConfig.EncodingType.RAW, null, null, FieldConfig.CompressionCodec.CLP, null, + null, null, null)); + + return fieldConfigs; + } + + @Override + protected IngestionConfig getIngestionConfig() { + List transforms = new ArrayList<>(); + transforms.add(new TransformConfig("timestampInEpoch", "now()")); + + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setTransformConfigs(transforms); + + return ingestionConfig; + } +} diff --git a/pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz b/pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz new file mode 100644 index 000000000000..c2f5baaa5f6b Binary files /dev/null and b/pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz differ diff --git a/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema b/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema new file mode 100644 index 000000000000..9aa38a1a19d7 --- /dev/null +++ b/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema @@ -0,0 +1,18 @@ +{ + "schemaName": "clpEncodingIT", + "dimensionFieldSpecs": [ + { + "name": "logLine", + "dataType": "STRING" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "timestampInEpoch", + "dataType": "LONG", + "notNull": false, + "format": "1:MILLISECONDS:EPOCH", + "granularity": "1:MILLISECONDS" + } + ] +} \ No newline at end of file diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java index 76f4e3c6f288..135035bb978a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java @@ -24,6 +24,7 @@ import org.apache.pinot.segment.local.io.util.PinotDataBitSet; import org.apache.pinot.segment.local.io.util.VarLengthValueReader; import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -127,7 +128,8 @@ public String getString(int docId, CLPReaderContext context) { } @Override - public void close() throws IOException { + public void close() + throws IOException { } @Override @@ -136,14 +138,17 @@ public CLPReaderContext createContext() { _encodedVarFwdIndexReader.createContext()); } + @Override + public ChunkCompressionType getCompressionType() { + return ChunkCompressionType.PASS_THROUGH; + } public static final class CLPReaderContext implements ForwardIndexReaderContext { private final FixedBitMVForwardIndexReader.Context _dictVarsReaderContext; private final ForwardIndexReaderContext _logTypeReaderContext; private final VarByteChunkForwardIndexReaderV4.ReaderContext _encodedVarReaderContext; - public CLPReaderContext( - FixedBitMVForwardIndexReader.Context dictVarsReaderContext, + public CLPReaderContext(FixedBitMVForwardIndexReader.Context dictVarsReaderContext, ForwardIndexReaderContext logTypeReaderContext, VarByteChunkForwardIndexReaderV4.ReaderContext encodedVarReaderContext) { _dictVarsReaderContext = dictVarsReaderContext; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java index 52abf41ea172..e7e1827efabf 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java @@ -273,8 +273,8 @@ private void extractCompressionCodecConfigsFromTableConfig(TableConfig tableConf List fieldConfigList = tableConfig.getFieldConfigList(); if (fieldConfigList != null) { for (FieldConfig fieldConfig : fieldConfigList) { - if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW - && fieldConfig.getCompressionCodec() != null) { + if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW && fieldConfig.getCompressionCodec() != null + && fieldConfig.getCompressionCodec().isApplicableToRawIndex()) { _rawIndexCreationColumns.add(fieldConfig.getName()); _rawIndexCompressionType.put(fieldConfig.getName(), ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name())); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java index 35ee54aa3ae1..132705036b3e 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java @@ -58,6 +58,7 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec if (compressionCodec != null) { switch (compressionCodec) { case PASS_THROUGH: + case CLP: _chunkCompressionType = ChunkCompressionType.PASS_THROUGH; _dictIdCompressionType = null; break; @@ -77,10 +78,6 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT; _chunkCompressionType = null; break; - case CLP: - _chunkCompressionType = null; - _dictIdCompressionType = null; - break; default: throw new IllegalStateException("Unsupported compression codec: " + compressionCodec); }