-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace IOPeon with SegmentWriteOutMedium; Improve buffer compression #4762
Replace IOPeon with SegmentWriteOutMedium; Improve buffer compression #4762
Conversation
…-compression-improvements
…-compression-improvements
Also, this PR now not always compresses blocks of the full block size (which is 64K), if the leftover or the whole column (e. g. if the segment is small) is lesser than that. This doesn't contradict the spec, but is not supported by the current implementation, that is fixed in #4824. I think if #4824 will appear in Druid 0.11 it will be good to apply this PR without bumping segment version. Removed |
…-compression-improvements
…-compression-improvements
@jihoonson could you please review this PR? |
…-compression-improvements
Sure, I'll review soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed until OutputMedium.
|
||
|Property|Description|Default| | ||
|--------|-----------|-------| | ||
|`druid.defaultOutputMediumFactory`|`tmpFile` or `offHeapMemory`, see explanation above|`tmpFile`| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe druid.peon.defaultOutputMediumFactory
is better because this configuration is used by peon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config could be used not only on peons, e. g. in Hadoop/Spark tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the perspective of Druid, Hadoop/Spark tasks are one of task types which use external systems for indexing, but are executed by peons. I think it doesn't matter how this configuration is used outside of Druid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot this comment. @leventov any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to druid.peon.defaultOutputMediumFactory
@@ -120,9 +120,10 @@ The tuningConfig is optional and default parameters will be used if no tuningCon | |||
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| | |||
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| | |||
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no| | |||
|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](./design/index.html). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. This flag cannot be used with either `appendToExisting` of IOConfig or `forceExtendableShardSpecs`. For more details, see the below __Segment publishing modes__ section.|false|no| | |||
|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../design/index.html). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. This flag cannot be used with either `appendToExisting` of IOConfig or `forceExtendableShardSpecs`. For more details, see the below __Segment publishing modes__ section.|false|no| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing.
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
|
||
public abstract class ByteBufferOutputBytes extends OutputBytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you add unit tests for the HeapByteBufferOutputBytes and DirectByteBufferOutputBytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests
|
||
public abstract class ByteBufferOutputBytes extends OutputBytes | ||
{ | ||
static final int BUFFER_SIZE = 64 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, is there any reason for 64K buffer size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have, maybe it's better to add some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment (there is no reason)
import java.nio.channels.FileChannel; | ||
import java.nio.channels.WritableByteChannel; | ||
|
||
final class FileOutputBytes extends OutputBytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reviewing. Reviewed up to StringDimensionMergerV9.
import java.io.IOException; | ||
import java.nio.channels.WritableByteChannel; | ||
|
||
public interface Serializer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you add some java docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
* @param capabilities The ColumnCapabilities of the dimension represented by this DimensionHandler | ||
* @param progress ProgressIndicator used by the merging process | ||
|
||
* @return A new DimensionMergerV9 object. | ||
*/ | ||
DimensionMergerV9<EncodedKeyComponentType> makeMerger( | ||
IndexSpec indexSpec, | ||
File outDir, | ||
IOPeon ioPeon, | ||
OutputMedium outputMedium, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
} | ||
catch (IOException ioe) { | ||
throw new RuntimeException(ioe); | ||
} | ||
} | ||
|
||
protected void setupEncodedValueWriter() throws IOException | ||
protected void setupEncodedValueWriter(OutputMedium outputMedium) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
|
||
@ExtensionPoint | ||
public interface GenericColumnSerializer extends Closeable | ||
public interface GenericColumnSerializer extends Serializer | ||
{ | ||
public void open() throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the public modifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jihoonson could you please add an IntelliJ/Checkstyle/PMD rule that prohibits unnecessary qualifiers in interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I'll make a PR soon.
…-compression-improvements
@@ -80,17 +73,19 @@ | |||
|
|||
protected String dimensionName; | |||
protected GenericIndexedWriter<String> dictionaryWriter; | |||
protected List<String> dictionary; | |||
protected String firstDictionaryValue; | |||
protected int dictionarySize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These variables can be private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, all variables in this class can be private as well. Would you please fix it too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed all
@@ -80,17 +73,19 @@ | |||
|
|||
protected String dimensionName; | |||
protected GenericIndexedWriter<String> dictionaryWriter; | |||
protected List<String> dictionary; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable looks to be used for only spatial indexes. Please leave some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment
if (hasSpatial) { | ||
dictionary.add(value); | ||
} | ||
dictionarySize++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From Line 191 to Line 198 duplicates with From Line 170 to Line 179. Please extract as a method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted
dictionaryWriter = new GenericIndexedWriter<>(outputMedium, dictFilename, GenericIndexed.STRING_STRATEGY); | ||
boolean hasSpatial = capabilities.hasSpatialIndexes(); | ||
if (hasSpatial) { | ||
dictionary = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an off-heap mmap buffer previsouly, but now it's an on-heap buffer. There will be some issues about memory and I'm not sure this is better even though disk write/read is removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented an optimization, now spatial index reuses dictionaryWriter
) | ||
private int metaSize() | ||
{ | ||
return 1 + 4 + 4 + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use Integer.BYTES.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored, as well as in other places
} | ||
|
||
@Override | ||
public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException | ||
private void writeLastOffset() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks that once this method is called, subsequent allValues() are not valid anymore. Please add comments about this somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this same for all other interfaces or classes implementing Serializer
? If so, all methods modifying data should check that they are called after this or getSerializedSize() is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added checks
out.flip(); | ||
} | ||
catch (IOException e) { | ||
log.error(e, "Error decompressing data"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it fine with logging instead of rethrowing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to rethrowing
++numWritten; | ||
SerializerUtils.writeBigEndianIntToOutputStream(valuesOut, bytesToWrite.length, sizeHelperBuffer); | ||
valuesOut.write(bytesToWrite); | ||
valuesOut.writeInt(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does writing a 0 mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment
} | ||
|
||
private void closeMultiFiles() throws IOException | ||
private void closeMultiFiles(WritableByteChannel channel, FileSmoosher smoosher) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed
long previousValuePosition = 0; | ||
int bagSize = 1 << bagSizePower; | ||
|
||
int numberOfFilesRequired = GenericIndexed.getNumberOfFilesRequired(bagSize, numWritten); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some comments what's going on here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what I should comment here
I finished my first pass review. I'll do another pass in couple of days. |
@jihoonson did you have a chance to look at this? |
* Reads bytes from the byte sequences, represented by this OutputBytes, at the random position, into the given | ||
* buffer. | ||
* | ||
* @throws RuntimeException if the byte sequences from the given pos ends before all bytes are read |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BufferUnderflowException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe "before the given buffer is filled"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please add a test for the case when BufferUnderflowException occurs as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, added tests
…-compression-improvements
…-compression-improvements
…-compression-improvements
@b-slim do you have more comments here? |
…-compression-improvements
@leventov did not finish the review and am out of office for personal vacation please do not block this PR on my review |
|
||
public ByteBufferInputStream(ByteBuffer buffer) | ||
{ | ||
this.buffer = buffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a read only copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more flexible, because allows to make or not to make copy before constructor call, depending on the needs. Added javadoc comment.
@b-slim could you please take another look? |
…-compression-improvements
…-compression-improvements
@b-slim if you are ok about the design of this PR, maybe we could merge it? |
Reviewed 116 of 234 files at r1, 41 of 81 files at r2. Comments from Reviewable |
…-compression-improvements
Reviewed 56 of 234 files at r1, 32 of 81 files at r2. processing/src/main/java/io/druid/segment/CompressedVSizeIndexedV3Supplier.java, line 92 at r2 (raw file):
wondering what is the benefits of IntArrayList VS ArrayList? processing/src/main/java/io/druid/segment/serde/ComplexColumnSerializer.java, line 50 at r2 (raw file):
this need to be called out since it is public. processing/src/main/java/io/druid/segment/writeout/FileWriteOutBytes.java, line 35 at r2 (raw file):
Wondering if we can reuse other projects code, for bytebuffers IO, this pattern is fairly common and usually this kind of code can have some common bugs that are hard to see. Comments from Reviewable |
Reviewed 8 of 234 files at r1, 7 of 81 files at r2, 2 of 2 files at r3. Comments from Reviewable |
@leventov sorry that this took so long, it was a very good PR thus very hard to review! |
@b-slim thanks for taking time to review this PR!
Memory savings, IntArrayList uses
To called out where? I think I made this factory method
As I mentioned here: #4762 (comment) I don't think this functionality could be used somewhere else in Druid. Somewhat similar is needed only possibly when making spilling in groupBy |
@gianm @b-slim @drcrallen FYI included important note about the downgrade baseline, on top of the first comment in this PR. |
Important, for release notes: downgrade from the Druid version, which includes this PR, is possible only to version, that includes #4824, i. e. Druid 0.11.0, but not earlier.
This PR consists of three sets of fairly independent changes:
IOPeon
withSegmentWriteOutMedium
Unfortunately the way how those changes were developed doesn't allow to split them into independent PRs at this point and make them pass tests (I tried). I apologise for the size of this PR and will try to avoid this situation in the future.
Replace
IOPeon
withSegmentWriteOutMedium
IOPeon
interface was used in many serializers and writers to temporarily store the "main volume" of the data:There was a single implementation
TmpFileIOPeon
, that created a temporary file for each stream of data in the task working directory.This interface is replaced with
SegmentWriteOutMedium
:WriteOutBytes
is an abstraction of appendable byte stream, that is readable and writable at any time.This interface change allowed to simplify many serializers and writers, and they don't need to be
Closeable
anymore.Also an off-heap memory based implementation of
SegmentWriteOutMedium
is added, along with tmpFile-based. Everyone who have enough memory on the servers should use this type ofSegmentWriteOutMedium
. Note that it may require to change-XX:MaxDirectMemorySize
JVM parameter.Type of
SegmentWriteOutMedium
to use could be configured per-task (in this PR this is implemented for most relevant types of tasks, except Hadoop tasks), via a parameter calledsegmentWriteOutMediumFactory
, with optionstmpFile
andoffHeapMemory
, somewhere in "tuningConfig".Also a generic
druid.defaultSegmentWriteOutMediumFactory
configuration added, to select theSegmentWriteOutMedium
to use for tasks in whichsegmentWriteOutMediumFactory
is not specified. It should work in Hadoop as well. The default value istmpFile
, for backward compatibility.Refactoring of buffer compression, remove unnecessary data copy
Buffer compression is simplified. Unnecessary data copy is removed at least at two points:
CompressedObjectStrategy.toBytes()
(This method is removed and not used anymore, because the code that used it was optimized)