Skip to content

Commit

Permalink
Add ITs
Browse files Browse the repository at this point in the history
  • Loading branch information
Saurabh Dubey authored and Saurabh Dubey committed Mar 13, 2024
1 parent 951df7b commit fcfac6b
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;


public class CLPEncodingRealtimeIntegrationTest extends BaseClusterIntegrationTestSet {
private List<File> _avroFiles;

@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
_avroFiles = unpackAvroData(_tempDir);

// Start the Pinot cluster
startZk();
// Start a customized controller with more frequent realtime segment validation
startController();
startBroker();
startServers(1);

startKafka();
pushAvroIntoKafka(_avroFiles);

Schema schema = createSchema();
addSchema(schema);
TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
addTableConfig(tableConfig);

waitForAllDocsLoaded(600_000L);
}

@Nullable
@Override
protected List<String> getInvertedIndexColumns() {
return null;
}

@Nullable
@Override
protected List<String> getRangeIndexColumns() {
return null;
}

@Nullable
@Override
protected List<String> getBloomFilterColumns() {
return null;
}

@Nullable
@Override
protected String getSortedColumn() {
return null;
}

@Override
protected List<String> getNoDictionaryColumns() {
return Collections.singletonList("logLine");
}

@Test
public void testValues()
throws Exception {
assertEquals(getCurrentCountStarResult(), getCountStarResult());
}

protected int getRealtimeSegmentFlushSize() {
return 30;
}

@Override
protected long getCountStarResult() {
return 100;
}

@Override
protected String getTableName() {
return "clpEncodingIT";
}

@Override
protected String getAvroTarFileName() {
return "clpEncodingITData.tar.gz";
}

@Override
protected String getSchemaFileName() {
return "clpEncodingRealtimeIntegrationTestSchema.schema";
}

@Override
protected String getTimeColumnName() {
return "timestampInEpoch";
}

@Override
protected List<FieldConfig> getFieldConfigs() {
List<FieldConfig> fieldConfigs = new ArrayList<>();
fieldConfigs.add(
new FieldConfig("logLine", FieldConfig.EncodingType.RAW, null, null, FieldConfig.CompressionCodec.CLP, null,
null, null, null));

return fieldConfigs;
}

@Override
protected IngestionConfig getIngestionConfig() {
List<TransformConfig> transforms = new ArrayList<>();
transforms.add(new TransformConfig("timestampInEpoch", "now()"));

IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transforms);

return ingestionConfig;
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"schemaName": "clpEncodingIT",
"dimensionFieldSpecs": [
{
"name": "logLine",
"dataType": "STRING"
}
],
"dateTimeFieldSpecs": [
{
"name": "timestampInEpoch",
"dataType": "LONG",
"notNull": false,
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
import org.apache.pinot.segment.local.io.util.VarLengthValueReader;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
Expand Down Expand Up @@ -127,7 +128,8 @@ public String getString(int docId, CLPReaderContext context) {
}

@Override
public void close() throws IOException {
public void close()
throws IOException {
}

@Override
Expand All @@ -136,14 +138,17 @@ public CLPReaderContext createContext() {
_encodedVarFwdIndexReader.createContext());
}

@Override
public ChunkCompressionType getCompressionType() {
return ChunkCompressionType.PASS_THROUGH;
}

public static final class CLPReaderContext implements ForwardIndexReaderContext {
private final FixedBitMVForwardIndexReader.Context _dictVarsReaderContext;
private final ForwardIndexReaderContext _logTypeReaderContext;
private final VarByteChunkForwardIndexReaderV4.ReaderContext _encodedVarReaderContext;

public CLPReaderContext(
FixedBitMVForwardIndexReader.Context dictVarsReaderContext,
public CLPReaderContext(FixedBitMVForwardIndexReader.Context dictVarsReaderContext,
ForwardIndexReaderContext logTypeReaderContext,
VarByteChunkForwardIndexReaderV4.ReaderContext encodedVarReaderContext) {
_dictVarsReaderContext = dictVarsReaderContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ private void extractCompressionCodecConfigsFromTableConfig(TableConfig tableConf
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
if (fieldConfigList != null) {
for (FieldConfig fieldConfig : fieldConfigList) {
if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW
&& fieldConfig.getCompressionCodec() != null) {
if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW && fieldConfig.getCompressionCodec() != null
&& fieldConfig.getCompressionCodec().isApplicableToRawIndex()) {
_rawIndexCreationColumns.add(fieldConfig.getName());
_rawIndexCompressionType.put(fieldConfig.getName(),
ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec
if (compressionCodec != null) {
switch (compressionCodec) {
case PASS_THROUGH:
case CLP:
_chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
_dictIdCompressionType = null;
break;
Expand All @@ -77,10 +78,6 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec
_dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT;
_chunkCompressionType = null;
break;
case CLP:
_chunkCompressionType = null;
_dictIdCompressionType = null;
break;
default:
throw new IllegalStateException("Unsupported compression codec: " + compressionCodec);
}
Expand Down

0 comments on commit fcfac6b

Please sign in to comment.