From fcfac6b800c53cef505aa23194729ca3d12f7875 Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Wed, 13 Mar 2024 12:36:26 +0530 Subject: [PATCH] Add ITs --- .../CLPEncodingRealtimeIntegrationTest.java | 149 ++++++++++++++++++ .../test/resources/clpEncodingITData.tar.gz | Bin 0 -> 1863 bytes ...codingRealtimeIntegrationTestSchema.schema | 18 +++ .../forward/CLPForwardIndexReaderV1.java | 11 +- .../spi/creator/SegmentGeneratorConfig.java | 4 +- .../segment/spi/index/ForwardIndexConfig.java | 5 +- 6 files changed, 178 insertions(+), 9 deletions(-) create mode 100644 pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java create mode 100644 pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz create mode 100644 pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java new file mode 100644 index 000000000000..c94f1f723f9c --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class CLPEncodingRealtimeIntegrationTest extends BaseClusterIntegrationTestSet { + private List _avroFiles; + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + _avroFiles = unpackAvroData(_tempDir); + + // Start the Pinot cluster + startZk(); + // Start a customized controller with more frequent realtime segment validation + startController(); + startBroker(); + startServers(1); + + startKafka(); + pushAvroIntoKafka(_avroFiles); + + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0)); + addTableConfig(tableConfig); + + waitForAllDocsLoaded(600_000L); + } + + @Nullable + @Override + protected List getInvertedIndexColumns() { + return null; + } + + @Nullable + @Override + protected List getRangeIndexColumns() { + return null; + } + + @Nullable + @Override + protected List getBloomFilterColumns() { + return null; + } + + @Nullable + @Override + protected String getSortedColumn() { + return null; + } + + @Override + protected List getNoDictionaryColumns() { + return Collections.singletonList("logLine"); + } + + @Test + public void testValues() + throws Exception { + assertEquals(getCurrentCountStarResult(), getCountStarResult()); + } + + protected int getRealtimeSegmentFlushSize() { + return 30; + } + + @Override + protected long getCountStarResult() { + return 100; + } + + @Override + protected String getTableName() { + return "clpEncodingIT"; + } + + @Override + protected String getAvroTarFileName() { + return "clpEncodingITData.tar.gz"; + } + + @Override + protected String getSchemaFileName() { + return "clpEncodingRealtimeIntegrationTestSchema.schema"; + } + + @Override + protected String getTimeColumnName() { + return "timestampInEpoch"; + } + + @Override + protected List getFieldConfigs() { + List fieldConfigs = new ArrayList<>(); + fieldConfigs.add( + new FieldConfig("logLine", FieldConfig.EncodingType.RAW, null, null, FieldConfig.CompressionCodec.CLP, null, + null, null, null)); + + return fieldConfigs; + } + + @Override + protected IngestionConfig getIngestionConfig() { + List transforms = new ArrayList<>(); + transforms.add(new TransformConfig("timestampInEpoch", "now()")); + + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setTransformConfigs(transforms); + + return ingestionConfig; + } +} diff --git a/pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz b/pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..c2f5baaa5f6b1541b3e2243956478a56aead8cfb GIT binary patch literal 1863 zcmV-N2e|kjiwFQFOYvm@1MOPPi`--w?}~>WJc{R+J#}E4zTY1Mde~}9v9-k(JctaL zne1-5Gs%)Q}N`R%*>l)-c6c!W|tytE;H$* z&+|UN=j-=8*|DGbp8p_;FPz;4A>@>T{8MmNd_p|G&%Y4}2|$zyiYX%m0;q2|y#U}D zp2=LZH1iXEl+>Rk{_wyX&xXPA3D4=r_V!xj%#8u&^M7*P9^T)2`L#R%ymWLBO#LTY z54W@9qhPzgodlye8E@}wNB%UwA5D%1VKx}+kQvM$*1zqA!DO6n_uqfGd~6c$-wdNb z|F)2mW=RGfyo${ROd`R3D$zimBzy!GQ2 z5?3I<0wqAI4^dy^dE=dHw?Xh>Fq&m?;=Q%_(g%07_cATr{045k2^feYa1f^%ms2AQ880*pKe!U*Un*be~IH<#{dMHuxwfCXY;c$@_( zNP=`W$v_%J8OUNVPQnL4^2e%H)NIa*R#nxCms$}j)IurR)c)F}_UJe}h$HV#@$v2~ ziUO_pUZ|n!>;Ikk6Yx49at93KIMZJ*BZhbMDZ=~)DYygnibLu<_{`FQ<~mYsjXtw? zE11T~@m+lv^ug{V9vy;YJRTr4Afk9ko9d?vGqk1C;(2f>NV9P2XZqJP{3rm3Yuk6V z<|GIJh0K%S?cLs&&(mHgyyZvwdyZ{9i^gDR4uJ;@=o^1BIsUdul5x@1*iX{ndgANI z9xwg1>yHkDXxshWpXdb(EnWETMRDydyt3^sf3kh0NjZ zX{mepp?AP2o*vDLI1jU5n&ftIEQO_Gr7Hc3Rn@OH9=I`Nb4B^VPKemTZAHj(E_-&c z;7-AU$pE8ugN0Dy;XGJQ%OtThNkG>7(-KP7N^K;KR956R(iXSm_G#(Grjki4ikc#L zo8Ip%y>!DoK@f>Ekcx=>*^YpgnB(H^wG!d+tCAJq>M=!9d6Nt$YL@6u0&s)83 zNzcWFh$=7?8DO};1uzu10b{$IJ5)wgx|~wua#?e^4ZYYD zdMW5vmQ?8my{;Au?Ck?KQ=D3vt3%IdNpTeG*}zJNTv8QaQ3-5~No)*(u7I0DP$zjz z83>4kJ-h3w?hpuE>ADp*#YhDM^zlm#Dq0%kFp2%~Nb}o)z?B~Pz@N<8n-k+^Cjv?IwuK`liqk%X%ntm=bTxm;8AL8R(`!Iu6fdLZr4 z9IbZHS#hl$2wv)?bYAt8Q#-Kqy4ECuIk6mtU07x%-l-@ZRU?^J!@+pXhV#j4;$Y&C zI0$Z#IUMF=na&*B`&}tn&HIWN?+eUx?{nTd=Gop~)$8nip^)u;vL1P=c2ZRJ{`Z!K zRnpguk3&OYDa5j8!?7usq>yl{EH0&XN(N@&z^p8b@O02Xbu>)TqMAkatggBg>)uuE z6E>6GHh&>Q>L}%^_qQc>J+6q5VJwuXIay?Gvy7$LIa0WkXCPG)Xrr`BJG=EXgiUi{ za4~7RqSUOp=5|)sYX)3~^rUIWrEny?syQyZvb}m-5)+pU3uF@^?7(;{OBM_pGTsKZ z6W`kAVh|6|N zomAhDYOQZ%(?%#HV|Gq8(}kAx22$n1RWWH-dm`3DLsT~ERWz&yPP#Vm)rGB@4nO{^*XN)ght-qOXvb!g#Ll7M+xPiQsW?UQ$$$X8;A%kt!ckrRO4`1KOhP}>Rb_l8p;+Ig1*w^3eQ5xAo)C13rUs~7zzXy@1Yb$*>+=hq9k{sYjGT<`!c002pl Bs~Z3S literal 0 HcmV?d00001 diff --git a/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema b/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema new file mode 100644 index 000000000000..9aa38a1a19d7 --- /dev/null +++ b/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema @@ -0,0 +1,18 @@ +{ + "schemaName": "clpEncodingIT", + "dimensionFieldSpecs": [ + { + "name": "logLine", + "dataType": "STRING" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "timestampInEpoch", + "dataType": "LONG", + "notNull": false, + "format": "1:MILLISECONDS:EPOCH", + "granularity": "1:MILLISECONDS" + } + ] +} \ No newline at end of file diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java index 76f4e3c6f288..135035bb978a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java @@ -24,6 +24,7 @@ import org.apache.pinot.segment.local.io.util.PinotDataBitSet; import org.apache.pinot.segment.local.io.util.VarLengthValueReader; import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -127,7 +128,8 @@ public String getString(int docId, CLPReaderContext context) { } @Override - public void close() throws IOException { + public void close() + throws IOException { } @Override @@ -136,14 +138,17 @@ public CLPReaderContext createContext() { _encodedVarFwdIndexReader.createContext()); } + @Override + public ChunkCompressionType getCompressionType() { + return ChunkCompressionType.PASS_THROUGH; + } public static final class CLPReaderContext implements ForwardIndexReaderContext { private final FixedBitMVForwardIndexReader.Context _dictVarsReaderContext; private final ForwardIndexReaderContext _logTypeReaderContext; private final VarByteChunkForwardIndexReaderV4.ReaderContext _encodedVarReaderContext; - public CLPReaderContext( - FixedBitMVForwardIndexReader.Context dictVarsReaderContext, + public CLPReaderContext(FixedBitMVForwardIndexReader.Context dictVarsReaderContext, ForwardIndexReaderContext logTypeReaderContext, VarByteChunkForwardIndexReaderV4.ReaderContext encodedVarReaderContext) { _dictVarsReaderContext = dictVarsReaderContext; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java index 52abf41ea172..e7e1827efabf 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java @@ -273,8 +273,8 @@ private void extractCompressionCodecConfigsFromTableConfig(TableConfig tableConf List fieldConfigList = tableConfig.getFieldConfigList(); if (fieldConfigList != null) { for (FieldConfig fieldConfig : fieldConfigList) { - if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW - && fieldConfig.getCompressionCodec() != null) { + if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW && fieldConfig.getCompressionCodec() != null + && fieldConfig.getCompressionCodec().isApplicableToRawIndex()) { _rawIndexCreationColumns.add(fieldConfig.getName()); _rawIndexCompressionType.put(fieldConfig.getName(), ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name())); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java index 35ee54aa3ae1..132705036b3e 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java @@ -58,6 +58,7 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec if (compressionCodec != null) { switch (compressionCodec) { case PASS_THROUGH: + case CLP: _chunkCompressionType = ChunkCompressionType.PASS_THROUGH; _dictIdCompressionType = null; break; @@ -77,10 +78,6 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT; _chunkCompressionType = null; break; - case CLP: - _chunkCompressionType = null; - _dictIdCompressionType = null; - break; default: throw new IllegalStateException("Unsupported compression codec: " + compressionCodec); }