Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace IOPeon with SegmentWriteOutMedium; Improve buffer compression #4762

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
506ef5a
Replace IOPeon with OutputMedium; Improve compression
leventov Sep 2, 2017
dab2feb
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Sep 14, 2017
03ca973
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Sep 19, 2017
e4fac07
Fix test
leventov Sep 19, 2017
174ffb7
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Sep 20, 2017
51b09a3
Cleanup CompressionStrategy
leventov Sep 20, 2017
41f3248
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Sep 21, 2017
bb68d21
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Sep 22, 2017
906bf84
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Oct 2, 2017
6281695
Javadocs
leventov Oct 2, 2017
43a3b9f
Add OutputBytesTest
leventov Oct 2, 2017
7e10873
Address comments
leventov Oct 4, 2017
98d602a
Random access in OutputBytes and GenericIndexedWriter
leventov Oct 4, 2017
3e9b6a2
Fix bugs
leventov Oct 4, 2017
baf90e7
Fixes
leventov Oct 11, 2017
9046e7a
Test OutputBytes.readFully()
leventov Oct 11, 2017
85bb813
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Oct 13, 2017
ab69f12
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Oct 16, 2017
345fc00
Address comments
leventov Oct 17, 2017
bbcaa6f
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Oct 17, 2017
3aec7cc
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Oct 18, 2017
0300e78
Rename OutputMedium to SegmentWriteOutMedium and OutputBytes to Write…
leventov Oct 19, 2017
18a889a
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Oct 19, 2017
75a5725
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Oct 30, 2017
a058e16
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Nov 7, 2017
832ca66
Add comments to ByteBufferInputStream
leventov Nov 7, 2017
aa1eb2e
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Nov 20, 2017
66378a1
Remove unused declarations
leventov Nov 21, 2017
2479fa1
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Nov 21, 2017
994685f
Merge remote-tracking branch 'upstream/master' into output-medium-and…
leventov Nov 27, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

package io.druid.benchmark;

import com.google.common.primitives.Ints;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.java.util.common.io.Closer;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.CompressionStrategy;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.VSizeIndexedInts;
import io.druid.segment.data.WritableSupplier;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
Expand Down Expand Up @@ -69,25 +70,20 @@ public void setup() throws IOException
}
final ByteBuffer bufferCompressed = serialize(
CompressedVSizeIntsIndexedSupplier.fromList(
Ints.asList(vals),
IntArrayList.wrap(vals),
bound - 1,
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForBytes(bytes),
ByteOrder.nativeOrder(), CompressedObjectStrategy.CompressionStrategy.LZ4
ByteOrder.nativeOrder(),
CompressionStrategy.LZ4,
Closer.create()
)
);
this.compressed = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
bufferCompressed,
ByteOrder.nativeOrder(),
null
ByteOrder.nativeOrder()
).get();

final ByteBuffer bufferUncompressed = serialize(
new VSizeIndexedInts.VSizeIndexedIntsSupplier(
VSizeIndexedInts.fromArray(
vals
)
)
);
final ByteBuffer bufferUncompressed = serialize(VSizeIndexedInts.fromArray(vals));
this.uncompressed = VSizeIndexedInts.readFromByteBuffer(bufferUncompressed);

filter = new BitSet();
Expand Down Expand Up @@ -128,7 +124,7 @@ public void close() throws IOException
}
};

writableSupplier.writeToChannel(channel);
writableSupplier.writeTo(channel, null);
buffer.rewind();
return buffer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.io.Closer;
import io.druid.segment.CompressedVSizeIndexedSupplier;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionStrategy;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.VSizeIndexed;
Expand Down Expand Up @@ -95,7 +96,9 @@ public IndexedInts apply(int[] input)
}
),
bound - 1,
ByteOrder.nativeOrder(), CompressedObjectStrategy.CompressionStrategy.LZ4
ByteOrder.nativeOrder(),
CompressionStrategy.LZ4,
Closer.create()
)
);
this.compressed = CompressedVSizeIndexedSupplier.fromByteBuffer(
Expand All @@ -117,7 +120,7 @@ public VSizeIndexedInts apply(int[] input)
}
}
)
).asWritableSupplier()
)
);
this.uncompressed = VSizeIndexed.readFromByteBuffer(bufferUncompressed);

Expand Down Expand Up @@ -159,7 +162,7 @@ public void close() throws IOException
}
};

writableSupplier.writeToChannel(channel);
writableSupplier.writeTo(channel, null);
buffer.rewind();
return buffer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.js.JavaScriptConfig;
import io.druid.output.OffHeapMemoryOutputMediumFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.extraction.ExtractionFn;
Expand Down Expand Up @@ -132,6 +133,7 @@ public class FilterPartitionBenchmark
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
OffHeapMemoryOutputMediumFactory.instance(),
new ColumnConfig()
{
@Override
Expand All @@ -141,7 +143,7 @@ public int columnCacheSizeBytes()
}
}
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemoryOutputMediumFactory.instance());
}

@Setup
Expand Down Expand Up @@ -178,7 +180,8 @@ public void setup() throws IOException
indexFile = INDEX_MERGER_V9.persist(
incIndex,
tmpDir,
new IndexSpec()
new IndexSpec(),
null
);
qIndex = INDEX_IO.loadIndex(indexFile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.js.JavaScriptConfig;
import io.druid.output.OffHeapMemoryOutputMediumFactory;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
Expand Down Expand Up @@ -137,6 +138,7 @@ public class FilteredAggregatorBenchmark
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
OffHeapMemoryOutputMediumFactory.instance(),
new ColumnConfig()
{
@Override
Expand All @@ -146,7 +148,7 @@ public int columnCacheSizeBytes()
}
}
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemoryOutputMediumFactory.instance());
}

@Setup
Expand Down Expand Up @@ -203,7 +205,8 @@ public void setup() throws IOException
indexFile = INDEX_MERGER_V9.persist(
incIndex,
tmpDir,
new IndexSpec()
new IndexSpec(),
null
);
qIndex = INDEX_IO.loadIndex(indexFile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void setup() throws Exception
File compFile = new File(dir, file + "-" + strategy);
rand = new Random();
ByteBuffer buffer = Files.map(compFile);
supplier = CompressedFloatsIndexedSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder(), null);
supplier = CompressedFloatsIndexedSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder());
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,20 @@
package io.druid.benchmark;

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSink;
import io.druid.benchmark.datagen.BenchmarkColumnSchema;
import io.druid.benchmark.datagen.BenchmarkColumnValueGenerator;
import io.druid.java.util.common.logger.Logger;
import io.druid.output.OffHeapMemoryOutputMedium;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.CompressionStrategy;
import io.druid.segment.data.FloatSupplierSerializer;
import io.druid.segment.data.TmpFileIOPeon;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
Expand All @@ -51,10 +47,10 @@ public class FloatCompressionBenchmarkFileGenerator
{
private static final Logger log = new Logger(FloatCompressionBenchmarkFileGenerator.class);
public static final int ROW_NUM = 5000000;
public static final List<CompressedObjectStrategy.CompressionStrategy> compressions =
public static final List<CompressionStrategy> compressions =
ImmutableList.of(
CompressedObjectStrategy.CompressionStrategy.LZ4,
CompressedObjectStrategy.CompressionStrategy.NONE
CompressionStrategy.LZ4,
CompressionStrategy.NONE
);

private static String dirPath = "floatCompress/";
Expand Down Expand Up @@ -143,48 +139,30 @@ public static void main(String[] args) throws IOException, URISyntaxException

// create compressed files using all combinations of CompressionStrategy and FloatEncoding provided
for (Map.Entry<String, BenchmarkColumnValueGenerator> entry : generators.entrySet()) {
for (CompressedObjectStrategy.CompressionStrategy compression : compressions) {
for (CompressionStrategy compression : compressions) {
String name = entry.getKey() + "-" + compression.toString();
log.info("%s: ", name);
File compFile = new File(dir, name);
compFile.delete();
File dataFile = new File(dir, entry.getKey());

TmpFileIOPeon iopeon = new TmpFileIOPeon(true);
FloatSupplierSerializer writer = CompressionFactory.getFloatSerializer(
iopeon,
new OffHeapMemoryOutputMedium(),
"float",
ByteOrder.nativeOrder(),
compression
);
BufferedReader br = Files.newBufferedReader(dataFile.toPath(), StandardCharsets.UTF_8);

try (FileChannel output = FileChannel.open(
compFile.toPath(),
StandardOpenOption.CREATE_NEW,
StandardOpenOption.WRITE
)) {
try (
BufferedReader br = Files.newBufferedReader(dataFile.toPath(), StandardCharsets.UTF_8);
FileChannel output =
FileChannel.open(compFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)
) {
writer.open();
String line;
while ((line = br.readLine()) != null) {
writer.add(Float.parseFloat(line));
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
writer.closeAndConsolidate(
new ByteSink()
{
@Override
public OutputStream openStream() throws IOException
{
return baos;
}
}
);
output.write(ByteBuffer.wrap(baos.toByteArray()));
}
finally {
iopeon.close();
br.close();
writer.writeTo(output, null);
}
log.info("%d", compFile.length() / 1024);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import com.google.common.primitives.Ints;
import io.druid.java.util.common.io.smoosh.FileSmoosher;
import io.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import io.druid.output.OffHeapMemoryOutputMedium;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.GenericIndexedWriter;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.data.TmpFileIOPeon;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -106,7 +106,7 @@ public int compare(byte[] o1, byte[] o2)
public void createGenericIndexed() throws IOException
{
GenericIndexedWriter<byte[]> genericIndexedWriter = new GenericIndexedWriter<>(
new TmpFileIOPeon(),
new OffHeapMemoryOutputMedium(),
"genericIndexedBenchmark",
byteArrayStrategy
);
Expand All @@ -121,14 +121,13 @@ public void createGenericIndexed() throws IOException
element.putInt(0, i);
genericIndexedWriter.write(element.array());
}
genericIndexedWriter.close();
smooshDir = Files.createTempDir();
file = File.createTempFile("genericIndexedBenchmark", "meta");

try (FileChannel fileChannel =
FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
FileSmoosher fileSmoosher = new FileSmoosher(smooshDir)) {
genericIndexedWriter.writeToChannel(fileChannel, fileSmoosher);
genericIndexedWriter.writeTo(fileChannel, fileSmoosher);
}

FileChannel fileChannel = FileChannel.open(file.toPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.offheap.OffheapBufferGenerator;
import io.druid.output.OffHeapMemoryOutputMediumFactory;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
Expand Down Expand Up @@ -153,6 +154,7 @@ public class GroupByTypeInterfaceBenchmark
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
OffHeapMemoryOutputMediumFactory.instance(),
new ColumnConfig()
{
@Override
Expand All @@ -162,7 +164,7 @@ public int columnCacheSizeBytes()
}
}
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemoryOutputMediumFactory.instance());
}

private static final Map<String, Map<String, GroupByQuery>> SCHEMA_QUERY_MAP = new LinkedHashMap<>();
Expand Down Expand Up @@ -339,7 +341,8 @@ public void setup() throws IOException
final File file = INDEX_MERGER_V9.persist(
index,
new File(tmpDir, String.valueOf(i)),
new IndexSpec()
new IndexSpec(),
null
);

queryableIndexes.add(INDEX_IO.loadIndex(file));
Expand Down
Loading