Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace IOPeon with SegmentWriteOutMedium; Improve buffer compression #4762

Merged

Conversation

leventov
Copy link
Member

@leventov leventov commented Sep 8, 2017

Important, for release notes: downgrade from the Druid version, which includes this PR, is possible only to version, that includes #4824, i. e. Druid 0.11.0, but not earlier.


This PR consists of three sets of fairly independent changes:

  • Replace IOPeon with SegmentWriteOutMedium
  • Refactoring of buffer compression, remove unnecessary data copy
  • Replace some boxing collections with fastutil, and similar things in the serialization part of the codebase

Unfortunately the way how those changes were developed doesn't allow to split them into independent PRs at this point and make them pass tests (I tried). I apologise for the size of this PR and will try to avoid this situation in the future.

Replace IOPeon with SegmentWriteOutMedium

IOPeon interface was used in many serializers and writers to temporarily store the "main volume" of the data:

interface IOPeon {
  OutputStream makeOutputStream(String filename);
  InputStream makeInputStream(String filename);
}

There was a single implementation TmpFileIOPeon, that created a temporary file for each stream of data in the task working directory.

This interface is replaced with SegmentWriteOutMedium:

interface SegmentWriteOutMedium {
  WriteOutBytes makeWriteOutBytes();
}

abstract class WriteOutBytes extends OutputStream implements WritableByteChannel {
 long size();
 void writeTo(WritableByteChannel channel);
 InputStream asInputStream();
 void readFully(long pos, ByteBuffer buffer);
}

WriteOutBytes is an abstraction of appendable byte stream, that is readable and writable at any time.

This interface change allowed to simplify many serializers and writers, and they don't need to be Closeable anymore.

Also an off-heap memory based implementation of SegmentWriteOutMedium is added, along with tmpFile-based. Everyone who have enough memory on the servers should use this type of SegmentWriteOutMedium. Note that it may require to change -XX:MaxDirectMemorySize JVM parameter.

Type of SegmentWriteOutMedium to use could be configured per-task (in this PR this is implemented for most relevant types of tasks, except Hadoop tasks), via a parameter called segmentWriteOutMediumFactory, with options tmpFile and offHeapMemory, somewhere in "tuningConfig".

Also a generic druid.defaultSegmentWriteOutMediumFactory configuration added, to select the SegmentWriteOutMedium to use for tasks in which segmentWriteOutMediumFactory is not specified. It should work in Hadoop as well. The default value is tmpFile, for backward compatibility.

Refactoring of buffer compression, remove unnecessary data copy

Buffer compression is simplified. Unnecessary data copy is removed at least at two points:

  • CompressedObjectStrategy.toBytes() (This method is removed and not used anymore, because the code that used it was optimized)
  • LZ4 compression now uses only direct buffers on the input and the output side, that allows the native implementation to not allocate another buffer internally and use the provided buffers.

@leventov leventov removed the WIP label Sep 19, 2017
@leventov
Copy link
Member Author

Also, this PR now not always compresses blocks of the full block size (which is 64K), if the leftover or the whole column (e. g. if the segment is small) is lesser than that. This doesn't contradict the spec, but is not supported by the current implementation, that is fixed in #4824. I think if #4824 will appear in Druid 0.11 it will be good to apply this PR without bumping segment version.

Removed WIP tag, this PR is good to review.

@leventov
Copy link
Member Author

@jihoonson could you please review this PR?

@jihoonson
Copy link
Contributor

Sure, I'll review soon.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed until OutputMedium.


|Property|Description|Default|
|--------|-----------|-------|
|`druid.defaultOutputMediumFactory`|`tmpFile` or `offHeapMemory`, see explanation above|`tmpFile`|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe druid.peon.defaultOutputMediumFactory is better because this configuration is used by peon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config could be used not only on peons, e. g. in Hadoop/Spark tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the perspective of Druid, Hadoop/Spark tasks are one of task types which use external systems for indexing, but are executed by peons. I think it doesn't matter how this configuration is used outside of Druid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot this comment. @leventov any thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to druid.peon.defaultOutputMediumFactory

@@ -120,9 +120,10 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](./design/index.html). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. This flag cannot be used with either `appendToExisting` of IOConfig or `forceExtendableShardSpecs`. For more details, see the below __Segment publishing modes__ section.|false|no|
|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../design/index.html). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. This flag cannot be used with either `appendToExisting` of IOConfig or `forceExtendableShardSpecs`. For more details, see the below __Segment publishing modes__ section.|false|no|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing.

import java.util.function.Function;
import java.util.stream.Collectors;

public abstract class ByteBufferOutputBytes extends OutputBytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add unit tests for the HeapByteBufferOutputBytes and DirectByteBufferOutputBytes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests


public abstract class ByteBufferOutputBytes extends OutputBytes
{
static final int BUFFER_SIZE = 64 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, is there any reason for 64K buffer size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have, maybe it's better to add some comments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment (there is no reason)

import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;

final class FileOutputBytes extends OutputBytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some unit tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reviewing. Reviewed up to StringDimensionMergerV9.

import java.io.IOException;
import java.nio.channels.WritableByteChannel;

public interface Serializer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add some java docs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

* @param capabilities The ColumnCapabilities of the dimension represented by this DimensionHandler
* @param progress ProgressIndicator used by the merging process

* @return A new DimensionMergerV9 object.
*/
DimensionMergerV9<EncodedKeyComponentType> makeMerger(
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
OutputMedium outputMedium,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update javadoc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

}
catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

protected void setupEncodedValueWriter() throws IOException
protected void setupEncodedValueWriter(OutputMedium outputMedium) throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be private.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed


@ExtensionPoint
public interface GenericColumnSerializer extends Closeable
public interface GenericColumnSerializer extends Serializer
{
public void open() throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the public modifier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson could you please add an IntelliJ/Checkstyle/PMD rule that prohibits unnecessary qualifiers in interfaces?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I'll make a PR soon.

@@ -80,17 +73,19 @@

protected String dimensionName;
protected GenericIndexedWriter<String> dictionaryWriter;
protected List<String> dictionary;
protected String firstDictionaryValue;
protected int dictionarySize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These variables can be private.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, all variables in this class can be private as well. Would you please fix it too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed all

@@ -80,17 +73,19 @@

protected String dimensionName;
protected GenericIndexedWriter<String> dictionaryWriter;
protected List<String> dictionary;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable looks to be used for only spatial indexes. Please leave some comments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

if (hasSpatial) {
dictionary.add(value);
}
dictionarySize++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Line 191 to Line 198 duplicates with From Line 170 to Line 179. Please extract as a method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted

dictionaryWriter = new GenericIndexedWriter<>(outputMedium, dictFilename, GenericIndexed.STRING_STRATEGY);
boolean hasSpatial = capabilities.hasSpatialIndexes();
if (hasSpatial) {
dictionary = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an off-heap mmap buffer previsouly, but now it's an on-heap buffer. There will be some issues about memory and I'm not sure this is better even though disk write/read is removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented an optimization, now spatial index reuses dictionaryWriter

)
private int metaSize()
{
return 1 + 4 + 4 + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Integer.BYTES.

Copy link
Member Author

@leventov leventov Oct 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored, as well as in other places

}

@Override
public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
private void writeLastOffset() throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks that once this method is called, subsequent allValues() are not valid anymore. Please add comments about this somewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this same for all other interfaces or classes implementing Serializer? If so, all methods modifying data should check that they are called after this or getSerializedSize() is called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added checks

out.flip();
}
catch (IOException e) {
log.error(e, "Error decompressing data");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fine with logging instead of rethrowing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to rethrowing

++numWritten;
SerializerUtils.writeBigEndianIntToOutputStream(valuesOut, bytesToWrite.length, sizeHelperBuffer);
valuesOut.write(bytesToWrite);
valuesOut.writeInt(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does writing a 0 mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

}

private void closeMultiFiles() throws IOException
private void closeMultiFiles(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

long previousValuePosition = 0;
int bagSize = 1 << bagSizePower;

int numberOfFilesRequired = GenericIndexed.getNumberOfFilesRequired(bagSize, numWritten);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comments what's going on here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what I should comment here

@jihoonson
Copy link
Contributor

I finished my first pass review. I'll do another pass in couple of days.

@leventov
Copy link
Member Author

@jihoonson did you have a chance to look at this?

* Reads bytes from the byte sequences, represented by this OutputBytes, at the random position, into the given
* buffer.
*
* @throws RuntimeException if the byte sequences from the given pos ends before all bytes are read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferUnderflowException?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "before the given buffer is filled"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please add a test for the case when BufferUnderflowException occurs as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, added tests

@leventov leventov changed the title Replace IOPeon with OutputMedium; Improve buffer compression Replace IOPeon with SegmentWriteOutMedium; Improve buffer compression Oct 19, 2017
@leventov
Copy link
Member Author

@b-slim do you have more comments here?

@b-slim
Copy link
Contributor

b-slim commented Oct 31, 2017

@leventov did not finish the review and am out of office for personal vacation please do not block this PR on my review


public ByteBufferInputStream(ByteBuffer buffer)
{
this.buffer = buffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a read only copy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more flexible, because allows to make or not to make copy before constructor call, depending on the needs. Added javadoc comment.

@leventov
Copy link
Member Author

leventov commented Nov 8, 2017

@b-slim could you please take another look?

@leventov
Copy link
Member Author

@b-slim if you are ok about the design of this PR, maybe we could merge it?

@b-slim
Copy link
Contributor

b-slim commented Nov 26, 2017

Reviewed 116 of 234 files at r1, 41 of 81 files at r2.
Review status: 134 of 236 files reviewed at latest revision, all discussions resolved.


Comments from Reviewable

@gianm
Copy link
Contributor

gianm commented Nov 28, 2017

@leventov @b-slim does this patch need any further review or is it ready to commit?

@b-slim
Copy link
Contributor

b-slim commented Nov 30, 2017

Reviewed 56 of 234 files at r1, 32 of 81 files at r2.
Review status: 219 of 236 files reviewed at latest revision, 3 unresolved discussions.


processing/src/main/java/io/druid/segment/CompressedVSizeIndexedV3Supplier.java, line 92 at r2 (raw file):

  {
    Iterator<IndexedInts> objects = objectsIterable.iterator();
    IntArrayList offsetList = new IntArrayList();

wondering what is the benefits of IntArrayList VS ArrayList?


processing/src/main/java/io/druid/segment/serde/ComplexColumnSerializer.java, line 50 at r2 (raw file):

  }

  @PublicApi

this need to be called out since it is public.


processing/src/main/java/io/druid/segment/writeout/FileWriteOutBytes.java, line 35 at r2 (raw file):

import java.nio.channels.WritableByteChannel;

final class FileWriteOutBytes extends WriteOutBytes

Wondering if we can reuse other projects code, for bytebuffers IO, this pattern is fairly common and usually this kind of code can have some common bugs that are hard to see.


Comments from Reviewable

@b-slim
Copy link
Contributor

b-slim commented Dec 5, 2017

Reviewed 8 of 234 files at r1, 7 of 81 files at r2, 2 of 2 files at r3.
Review status: all files reviewed at latest revision, 3 unresolved discussions.


Comments from Reviewable

@b-slim b-slim merged commit a7a6a04 into apache:master Dec 5, 2017
@b-slim
Copy link
Contributor

b-slim commented Dec 5, 2017

@leventov sorry that this took so long, it was a very good PR thus very hard to review!

@leventov leventov deleted the output-medium-and-compression-improvements branch December 5, 2017 12:02
@leventov
Copy link
Member Author

leventov commented Dec 5, 2017

@b-slim thanks for taking time to review this PR!

wondering what is the benefits of IntArrayList VS ArrayList?

Memory savings, IntArrayList uses int[] instead of Object[]

this need to be called out since it is public.

To called out where? I think I made this factory method @PublicApi to suppress warning about this method being unnecessarily public instead of package-private. As far as I remember it was mentioned somewhere that this serde API should be considered public.

Wondering if we can reuse other projects code, for bytebuffers IO, this pattern is fairly common and usually this kind of code can have some common bugs that are hard to see.

As I mentioned here: #4762 (comment) I don't think this functionality could be used somewhere else in Druid. Somewhat similar is needed only possibly when making spilling in groupBy

@leventov
Copy link
Member Author

leventov commented Dec 5, 2017

@gianm @b-slim @drcrallen FYI included important note about the downgrade baseline, on top of the first comment in this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants