Skip to content

Commit

Permalink
Lint + misc
Browse files Browse the repository at this point in the history
  • Loading branch information
Saurabh Dubey authored and Saurabh Dubey committed Mar 13, 2024
1 parent fcfac6b commit c81fa0d
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,11 @@ public interface CLPStatsProvider {
CLPStats getCLPStats();

class CLPStats {
private final EncodedMessage _clpEncodedMessage;
private final MessageEncoder _clpMessageEncoder;
int _totalNumberOfDictVars = 0;
int _totalNumberOfEncodedVars = 0;
int _maxNumberOfEncodedVars = 0;
private String[] _sortedLogTypeValues;
private String[] _sortedDictVarValues;
private Set<String> _logTypes = new ObjectOpenHashSet<>(AbstractColumnStatisticsCollector.INITIAL_HASH_SET_SIZE);
private Set<String> _dictVars = new ObjectOpenHashSet<>(AbstractColumnStatisticsCollector.INITIAL_HASH_SET_SIZE);

public CLPStats(String[] sortedLogTypeValues, String[] sortedDictVarValues, int totalNumberOfDictVars,
int totalNumberOfEncodedVars, int maxNumberOfEncodedVars) {
Expand All @@ -50,56 +46,6 @@ public CLPStats(String[] sortedLogTypeValues, String[] sortedDictVarValues, int
_totalNumberOfDictVars = totalNumberOfDictVars;
_totalNumberOfEncodedVars = totalNumberOfEncodedVars;
_maxNumberOfEncodedVars = maxNumberOfEncodedVars;
_clpEncodedMessage = null;
_clpMessageEncoder = null;
}

public CLPStats() {
_clpEncodedMessage = new EncodedMessage();
_clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
}

public void collect(String value) {
String logType;
String[] dictVars;
Long[] encodedVars;

try {
_clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
logType = _clpEncodedMessage.getLogTypeAsString();
dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
} catch (IOException e) {
throw new IllegalArgumentException("Failed to encode message: " + value, e);
}

if (logType == null) {
logType = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
}

if (dictVars == null) {
dictVars = new String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
}

if (encodedVars == null) {
encodedVars = new Long[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG};
}

_logTypes.add(logType);
_dictVars.addAll(Arrays.asList(dictVars));
_totalNumberOfDictVars += dictVars.length;
_totalNumberOfEncodedVars += encodedVars.length;
_maxNumberOfEncodedVars = Math.max(_maxNumberOfEncodedVars, encodedVars.length);
}

public void seal() {
_sortedLogTypeValues = _logTypes.toArray(new String[0]);
_logTypes = null;
Arrays.sort(_sortedLogTypeValues);
_sortedDictVarValues = _dictVars.toArray(new String[0]);
_dictVars = null;
Arrays.sort(_sortedDictVarValues);
}

public void clear() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.stats;

import com.google.common.annotations.VisibleForTesting;
import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
import com.yscope.clp.compressorfrontend.EncodedMessage;
import com.yscope.clp.compressorfrontend.MessageEncoder;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.data.FieldSpec;

import static java.nio.charset.StandardCharsets.UTF_8;

Expand All @@ -34,12 +40,12 @@ public class StringColumnPreIndexStatsCollector extends AbstractColumnStatistics
private int _maxRowLength = 0;
private String[] _sortedValues;
private boolean _sealed = false;
private CLPStats _clpStats;
private CLPStatsCollector _clpStatsCollector;

public StringColumnPreIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) {
super(column, statsCollectorConfig);
if (_fieldConfig != null && _fieldConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLP) {
_clpStats = new CLPStats();
_clpStatsCollector = new CLPStatsCollector();
}
}

Expand All @@ -53,8 +59,8 @@ public void collect(Object entry) {
for (Object obj : values) {
String value = (String) obj;
_values.add(value);
if (_clpStats != null) {
_clpStats.collect(value);
if (_clpStatsCollector != null) {
_clpStatsCollector.collect(value);
}

int length = value.getBytes(UTF_8).length;
Expand All @@ -69,8 +75,8 @@ public void collect(Object entry) {
} else {
String value = (String) entry;
addressSorted(value);
if (_clpStats != null) {
_clpStats.collect(value);
if (_clpStatsCollector != null) {
_clpStatsCollector.collect(value);
}
if (_values.add(value)) {
if (isPartitionEnabled()) {
Expand All @@ -87,7 +93,10 @@ public void collect(Object entry) {

@Override
public CLPStats getCLPStats() {
return _clpStats;
if (_sealed) {
return _clpStatsCollector.getCLPStats();
}
throw new IllegalStateException("you must seal the collector first before asking for clp stats");
}

@Override
Expand Down Expand Up @@ -141,10 +150,77 @@ public void seal() {
_sortedValues = _values.toArray(new String[0]);
_values = null;
Arrays.sort(_sortedValues);
if (_clpStats != null) {
_clpStats.seal();
if (_clpStatsCollector != null) {
_clpStatsCollector.seal();
}
_sealed = true;
}
}

@VisibleForTesting
public static class CLPStatsCollector {
private final EncodedMessage _clpEncodedMessage;
private final MessageEncoder _clpMessageEncoder;
int _totalNumberOfDictVars = 0;
int _totalNumberOfEncodedVars = 0;
int _maxNumberOfEncodedVars = 0;
private Set<String> _logTypes = new ObjectOpenHashSet<>(AbstractColumnStatisticsCollector.INITIAL_HASH_SET_SIZE);
private Set<String> _dictVars = new ObjectOpenHashSet<>(AbstractColumnStatisticsCollector.INITIAL_HASH_SET_SIZE);
private CLPStats _clpStats;

public CLPStatsCollector() {
_clpEncodedMessage = new EncodedMessage();
_clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
}

public void collect(String value) {
String logType;
String[] dictVars;
Long[] encodedVars;

try {
_clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
logType = _clpEncodedMessage.getLogTypeAsString();
dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
} catch (IOException e) {
throw new IllegalArgumentException("Failed to encode message: " + value, e);
}

if (logType == null) {
logType = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
}

if (dictVars == null) {
dictVars = new String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
}

if (encodedVars == null) {
encodedVars = new Long[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG};
}

_logTypes.add(logType);
_dictVars.addAll(Arrays.asList(dictVars));
_totalNumberOfDictVars += dictVars.length;
_totalNumberOfEncodedVars += encodedVars.length;
_maxNumberOfEncodedVars = Math.max(_maxNumberOfEncodedVars, encodedVars.length);
}

public void seal() {
String[] sortedLogTypeValues = _logTypes.toArray(new String[0]);
_logTypes = null;
Arrays.sort(sortedLogTypeValues);
String[] sortedDictVarValues = _dictVars.toArray(new String[0]);
_dictVars = null;
Arrays.sort(sortedDictVarValues);
_clpStats =
new CLPStats(sortedLogTypeValues, sortedDictVarValues, _totalNumberOfDictVars, _totalNumberOfEncodedVars,
_maxNumberOfEncodedVars);
}

public CLPStats getCLPStats() {
return _clpStats;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public static ForwardIndexReaderFactory getInstance() {
return StandardIndexes.forward();
}

@Override
protected ForwardIndexReader createIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata,
ForwardIndexConfig indexConfig)
throws IndexReaderConstraintException {
return createIndexReader(dataBuffer, metadata);
}

public static ForwardIndexReader createIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) {
if (metadata.hasDictionary()) {
if (metadata.isSingleValue()) {
Expand Down Expand Up @@ -120,11 +127,4 @@ private static ForwardIndexReader createNonV4RawIndexReader(PinotDataBuffer data
}
}
}

@Override
protected ForwardIndexReader createIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata,
ForwardIndexConfig indexConfig)
throws IndexReaderConstraintException {
return createIndexReader(dataBuffer, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ public void testCLPWriter()
"2023/10/26 00:03:10.169 INFO [PropertyCache] [HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+ "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property LiveInstance took 4 ms. Selective: true");
logLines.add(
"2023/10/27 16:35:10.470 INFO [ControllerResponseFilter] [grizzly-http-server-2] Handled request from 10.12"
+ ".15.1 GET https://10.12.15.10:8443/health?checkType=liveness, content-type null status code 200 OK");
"2023/10/27 16:35:10.470 INFO [ControllerResponseFilter] [grizzly-http-server-2] Handled request from 0.0"
+ ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, content-type null status code 200 OK");
logLines.add(
"2023/10/27 16:35:10.607 INFO [ControllerResponseFilter] [grizzly-http-server-6] Handled request from 10.12"
+ ".19.5 GET https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, content-type "
"2023/10/27 16:35:10.607 INFO [ControllerResponseFilter] [grizzly-http-server-6] Handled request from 0.0"
+ ".0.0 GET https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, content-type "
+ "application/json status code 200 OK");

Schema schema = new Schema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndex;
import org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
import org.apache.pinot.spi.data.FieldSpec;
import org.testng.Assert;
Expand Down Expand Up @@ -59,18 +60,16 @@ public void testString()
List<String> logLines = new ArrayList<>();
logLines.add(
"2023/10/26 00:03:10.168 INFO [PropertyCache] [HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+ "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property LiveInstance took 5 ms. Selective: "
+ "true");
+ "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property LiveInstance took 5 ms. Selective: true");
logLines.add(
"2023/10/26 00:03:10.169 INFO [PropertyCache] [HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+ "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property LiveInstance took 4 ms. Selective: "
+ "true");
+ "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property LiveInstance took 4 ms. Selective: true");
logLines.add(
"2023/10/27 16:35:10.470 INFO [ControllerResponseFilter] [grizzly-http-server-2] Handled request from 10.12"
+ ".15.1 GET https://10.12.15.10:8443/health?checkType=liveness, content-type null status code 200 OK");
"2023/10/27 16:35:10.470 INFO [ControllerResponseFilter] [grizzly-http-server-2] Handled request from 0.0"
+ ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, content-type null status code 200 OK");
logLines.add(
"2023/10/27 16:35:10.607 INFO [ControllerResponseFilter] [grizzly-http-server-6] Handled request from 10.12"
+ ".19.5 GET https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, content-type "
"2023/10/27 16:35:10.607 INFO [ControllerResponseFilter] [grizzly-http-server-6] Handled request from 0.0"
+ ".0.0 GET https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, content-type "
+ "application/json status code 200 OK");

for (int i = 0; i < rows; i++) {
Expand All @@ -82,11 +81,13 @@ public void testString()
}

// Verify clp stats
CLPStatsProvider.CLPStats stats = new CLPStatsProvider.CLPStats();
StringColumnPreIndexStatsCollector.CLPStatsCollector statsCollector =
new StringColumnPreIndexStatsCollector.CLPStatsCollector();
for (int i = 0; i < rows; i++) {
stats.collect(logLines.get(i));
statsCollector.collect(logLines.get(i));
}
stats.seal();
statsCollector.seal();
CLPStatsProvider.CLPStats stats = statsCollector.getCLPStats();

CLPStatsProvider.CLPStats mutableIndexStats = readerWriter.getCLPStats();
Assert.assertEquals(stats.getTotalNumberOfDictVars(), mutableIndexStats.getTotalNumberOfDictVars());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,17 @@ final class Builder {
private boolean _textCommitOnClose;

public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo columnIndexCreationInfo) {
return withLengthOfLongestEntry(
columnIndexCreationInfo.getLengthOfLongestEntry()).withMaxNumberOfMultiValueElements(
columnIndexCreationInfo.getMaxNumberOfMultiValueElements())
return withLengthOfLongestEntry(columnIndexCreationInfo.getLengthOfLongestEntry())
.withMaxNumberOfMultiValueElements(columnIndexCreationInfo.getMaxNumberOfMultiValueElements())
.withMaxRowLengthInBytes(columnIndexCreationInfo.getMaxRowLengthInBytes())
.withMinValue((Comparable<?>) columnIndexCreationInfo.getMin())
.withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax())
.withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries())
.withSortedUniqueElementsArray(columnIndexCreationInfo.getSortedUniqueElementsArray())
.withColumnStatistics(columnIndexCreationInfo.getColumnStatistics())
.withCardinality(columnIndexCreationInfo.getDistinctValueCount())
.withFixedLength(columnIndexCreationInfo.isFixedLength()).sorted(columnIndexCreationInfo.isSorted());
.withFixedLength(columnIndexCreationInfo.isFixedLength())
.sorted(columnIndexCreationInfo.isSorted());
}

public Builder withColumnStatistics(ColumnStatistics columnStatistics) {
Expand Down Expand Up @@ -268,11 +268,12 @@ final class Common implements IndexCreationContext {
private final boolean _textCommitOnClose;
private final ColumnStatistics _columnStatistics;

public Common(File indexDir, int lengthOfLongestEntry, int maxNumberOfMultiValueElements, int maxRowLengthInBytes,
boolean onHeap, FieldSpec fieldSpec, boolean sorted, int cardinality, int totalNumberOfEntries, int totalDocs,
boolean hasDictionary, Comparable<?> minValue, Comparable<?> maxValue, boolean forwardIndexDisabled,
Object sortedUniqueElementsArray, boolean optimizeDictionary, boolean fixedLength, boolean textCommitOnClose,
ColumnStatistics columnStatistics) {
public Common(File indexDir, int lengthOfLongestEntry,
int maxNumberOfMultiValueElements, int maxRowLengthInBytes, boolean onHeap,
FieldSpec fieldSpec, boolean sorted, int cardinality, int totalNumberOfEntries,
int totalDocs, boolean hasDictionary, Comparable<?> minValue, Comparable<?> maxValue,
boolean forwardIndexDisabled, Object sortedUniqueElementsArray, boolean optimizeDictionary,
boolean fixedLength, boolean textCommitOnClose, ColumnStatistics columnStatistics) {
_indexDir = indexDir;
_lengthOfLongestEntry = lengthOfLongestEntry;
_maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ private void extractCompressionCodecConfigsFromTableConfig(TableConfig tableConf
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
if (fieldConfigList != null) {
for (FieldConfig fieldConfig : fieldConfigList) {
if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW && fieldConfig.getCompressionCodec() != null
if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW
&& fieldConfig.getCompressionCodec() != null
&& fieldConfig.getCompressionCodec().isApplicableToRawIndex()) {
_rawIndexCreationColumns.add(fieldConfig.getName());
_rawIndexCompressionType.put(fieldConfig.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static PinotByteBuffer mapFile(File file, boolean readOnly, long offset, int siz
}
}

public PinotByteBuffer(ByteBuffer buffer, boolean closeable, boolean flushable) {
private PinotByteBuffer(ByteBuffer buffer, boolean closeable, boolean flushable) {
super(closeable);
_buffer = buffer;
_flushable = flushable;
Expand Down

0 comments on commit c81fa0d

Please sign in to comment.