Skip to content

Commit

Permalink
Add validations
Browse files Browse the repository at this point in the history
  • Loading branch information
Saurabh Dubey authored and Saurabh Dubey committed Mar 13, 2024
1 parent e474bb2 commit 951df7b
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ CompressionCodec getCompressionCodec() {
CompressionCodec[] compressionCodecs = CompressionCodec.values();
while (true) {
CompressionCodec compressionCodec = compressionCodecs[RANDOM.nextInt(compressionCodecs.length)];
if (compressionCodec.isApplicableToRawIndex() && compressionCodec != CompressionCodec.CLP) {
if (compressionCodec.isApplicableToRawIndex()) {
return compressionCodec;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class CLPForwardIndexCreatorV1 implements ForwardIndexCreator {
public static final byte[] MAGIC_BYTES = "CLP.v1".getBytes(StandardCharsets.UTF_8);
private final String _column;
private final int _numDocs;
private final File _baseIndexDir;
private final File _intermediateFilesDir;
private final FileChannel _dataFile;
private final ByteBuffer _fileBuffer;
private final EncodedMessage _clpEncodedMessage;
Expand All @@ -82,35 +82,42 @@ public CLPForwardIndexCreatorV1(File baseIndexDir, String column, int numDocs, C
throws IOException {
_column = column;
_numDocs = numDocs;
_baseIndexDir = baseIndexDir;
_intermediateFilesDir =
new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION + ".clp.tmp");
if (_intermediateFilesDir.exists()) {
FileUtils.cleanDirectory(_intermediateFilesDir);
} else {
FileUtils.forceMkdir(_intermediateFilesDir);
}

_dataFile =
new RandomAccessFile(new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION),
"rw").getChannel();
_fileBuffer = _dataFile.map(FileChannel.MapMode.READ_WRITE, 0, Integer.MAX_VALUE);

CLPStatsProvider statsCollector = (CLPStatsProvider) columnStatistics;
_clpStats = statsCollector.getCLPStats();
_logTypeDictFile = new File(_baseIndexDir, _column + "_clp_logtype.dict");
_logTypeDictFile = new File(_intermediateFilesDir, _column + "_clp_logtype.dict");
_logTypeDictCreator =
new SegmentDictionaryCreator(_column + "_clp_logtype.dict", FieldSpec.DataType.STRING, _logTypeDictFile, true);
_logTypeDictCreator.build(_clpStats.getSortedLogTypeValues());

_dictVarsDictFile = new File(_baseIndexDir, _column + "_clp_dictvars.dict");
_dictVarsDictFile = new File(_intermediateFilesDir, _column + "_clp_dictvars.dict");
_dictVarsDictCreator =
new SegmentDictionaryCreator(_column + "_clp_dictvars.dict", FieldSpec.DataType.STRING, _dictVarsDictFile,
true);
_dictVarsDictCreator.build(_clpStats.getSortedDictVarValues());

_logTypeFwdIndexFile = new File(_baseIndexDir, column + "_clp_logtype.fwd");
_logTypeFwdIndexFile = new File(_intermediateFilesDir, column + "_clp_logtype.fwd");
_logTypeFwdIndexWriter = new FixedBitSVForwardIndexWriter(_logTypeFwdIndexFile, numDocs,
PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedLogTypeValues().length - 1));

_dictVarsFwdIndexFile = new File(_baseIndexDir, column + "_clp_dictvars.fwd");
_dictVarsFwdIndexFile = new File(_intermediateFilesDir, column + "_clp_dictvars.fwd");
_dictVarsFwdIndexWriter =
new FixedBitMVForwardIndexWriter(_dictVarsFwdIndexFile, numDocs, _clpStats.getTotalNumberOfDictVars(),
PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedDictVarValues().length - 1));

_encodedVarsFwdIndexFile = new File(_baseIndexDir, column + "_clp_encodedvars.fwd");
_encodedVarsFwdIndexFile = new File(_intermediateFilesDir, column + "_clp_encodedvars.fwd");
_encodedVarsFwdIndexWriter =
new MultiValueFixedByteRawIndexCreator(_encodedVarsFwdIndexFile, ChunkCompressionType.LZ4, numDocs,
FieldSpec.DataType.LONG, _clpStats.getMaxNumberOfEncodedVars(), false,
Expand Down Expand Up @@ -187,7 +194,7 @@ private void addCLPFields(String logtype, String[] dictVars, Long[] encodedVars)
}

@Override
public void close()
public void seal()
throws IOException {
// Append all of these into fileBuffer
_logTypeDictCreator.seal();
Expand Down Expand Up @@ -247,18 +254,19 @@ public void close()
totalSize += _encodedVarsFwdIndexFile.length();

_dataFile.truncate(totalSize);

// Delete all temp files
FileUtils.deleteQuietly(_logTypeDictFile);
FileUtils.deleteQuietly(_dictVarsDictFile);
FileUtils.deleteQuietly(_logTypeFwdIndexFile);
FileUtils.deleteQuietly(_dictVarsFwdIndexFile);
FileUtils.deleteQuietly(_encodedVarsFwdIndexFile);
}

private void copyFileIntoBuffer(File file) throws IOException {
try (FileChannel from = (FileChannel.open(file.toPath(), StandardOpenOption.READ))) {
_fileBuffer.put(from.map(FileChannel.MapMode.READ_ONLY, 0, file.length()));
}
}

@Override
public void close()
throws IOException {
// Delete all temp file
_dataFile.close();
FileUtils.deleteDirectory(_intermediateFilesDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1192,8 +1192,14 @@ private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldCon
CompressionCodec compressionCodec = fieldConfig.getCompressionCodec();
switch (encodingType) {
case RAW:
Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToRawIndex(),
"Compression codec: %s is not applicable to raw index", compressionCodec);
Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToRawIndex()
|| compressionCodec == CompressionCodec.CLP, "Compression codec: %s is not applicable to raw index",
compressionCodec);
if (compressionCodec == CompressionCodec.CLP && schema != null) {
Preconditions.checkArgument(
schema.getFieldSpecFor(columnName).getDataType().getStoredType() == DataType.STRING,
"CLP compression codec can only be applied to string columns");
}
break;
case DICTIONARY:
Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToDictEncodedIndex(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void testCLPWriter()
for (String logLine : logLines) {
clpForwardIndexCreatorV1.putString(logLine);
}
clpForwardIndexCreatorV1.seal();
clpForwardIndexCreatorV1.close();

PinotDataBuffer pinotDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ public enum CompressionCodec {
SNAPPY(true, false),
ZSTANDARD(true, false),
LZ4(true, false),
CLP(true, false),
// CLP is a special type of compression codec that isn't generally applicable to all RAW columns and has a
// special handling (see {@link CLPForwardIndexCreatorV1})
CLP(false, false),

// For MV dictionary encoded forward index, add a second level dictionary encoding for the multi-value entries
MV_ENTRY_DICT(false, true);
Expand Down

0 comments on commit 951df7b

Please sign in to comment.