Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change native parquet writer to write v1 parquet files #9611

Merged
merged 1 commit into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import static java.lang.Math.toIntExact;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;

public class ParquetWriter
implements Closeable
Expand Down Expand Up @@ -90,7 +90,7 @@ public ParquetWriter(
requireNonNull(compressionCodecName, "compressionCodecName is null");

ParquetProperties parquetProperties = ParquetProperties.builder()
.withWriterVersion(PARQUET_2_0)
.withWriterVersion(PARQUET_1_0)
.withPageSize(writerOption.getMaxPageSize())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ public ColumnWriter primitive(PrimitiveType primitive)
return new PrimitiveColumnWriter(
columnDescriptor,
getValueWriter(parquetProperties.newValuesWriter(columnDescriptor), trinoType, columnDescriptor.getPrimitiveType()),
parquetProperties.newDefinitionLevelEncoder(columnDescriptor),
parquetProperties.newRepetitionLevelEncoder(columnDescriptor),
parquetProperties.newDefinitionLevelWriter(columnDescriptor),
parquetProperties.newRepetitionLevelWriter(columnDescriptor),
compressionCodecName,
parquetProperties.getPageSizeThreshold());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.format.ColumnMetaData;
import org.apache.parquet.format.PageEncodingStats;
import org.apache.parquet.format.PageType;
Expand Down Expand Up @@ -63,26 +63,25 @@ public class PrimitiveColumnWriter
private final CompressionCodecName compressionCodec;

private final PrimitiveValueWriter primitiveValueWriter;
private final RunLengthBitPackingHybridEncoder definitionLevelEncoder;
private final RunLengthBitPackingHybridEncoder repetitionLevelEncoder;
private final ValuesWriter definitionLevelWriter;
private final ValuesWriter repetitionLevelWriter;

private final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();

private boolean closed;
private boolean getDataStreamsCalled;

// current page stats
private int currentPageRows;
private int valueCount;
private int currentPageNullCounts;
private int currentPageRowCount;

// column meta data stats
private final Set<Encoding> encodings = new HashSet<>();
private final Map<org.apache.parquet.format.Encoding, Integer> dataPagesWithEncoding = new HashMap<>();
private final Map<org.apache.parquet.format.Encoding, Integer> dictionaryPagesWithEncoding = new HashMap<>();
private long totalCompressedSize;
private long totalUnCompressedSize;
private long totalRows;
private long totalValues;
private Statistics<?> columnStatistics;

private final int maxDefinitionLevel;
Expand All @@ -94,18 +93,16 @@ public class PrimitiveColumnWriter

private final int pageSizeThreshold;

public PrimitiveColumnWriter(ColumnDescriptor columnDescriptor, PrimitiveValueWriter primitiveValueWriter, RunLengthBitPackingHybridEncoder definitionLevelEncoder, RunLengthBitPackingHybridEncoder repetitionLevelEncoder, CompressionCodecName compressionCodecName, int pageSizeThreshold)
public PrimitiveColumnWriter(ColumnDescriptor columnDescriptor, PrimitiveValueWriter primitiveValueWriter, ValuesWriter definitionLevelWriter, ValuesWriter repetitionLevelWriter, CompressionCodecName compressionCodecName, int pageSizeThreshold)
{
this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null");
this.maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();

this.definitionLevelEncoder = requireNonNull(definitionLevelEncoder, "definitionLevelEncoder is null");
this.repetitionLevelEncoder = requireNonNull(repetitionLevelEncoder, "repetitionLevelEncoder is null");
this.definitionLevelWriter = requireNonNull(definitionLevelWriter, "definitionLevelWriter is null");
this.repetitionLevelWriter = requireNonNull(repetitionLevelWriter, "repetitionLevelWriter is null");
this.primitiveValueWriter = requireNonNull(primitiveValueWriter, "primitiveValueWriter is null");
this.compressionCodec = requireNonNull(compressionCodecName, "compressionCodecName is null");
this.compressor = getCompressor(compressionCodecName);
this.pageSizeThreshold = pageSizeThreshold;

this.columnStatistics = Statistics.createStats(columnDescriptor.getPrimitiveType());
}

Expand All @@ -132,21 +129,18 @@ public void writeBlock(ColumnChunk columnChunk)
Iterator<Integer> defIterator = DefLevelIterables.getIterator(current.getDefLevelIterables());
while (defIterator.hasNext()) {
int next = defIterator.next();
definitionLevelEncoder.writeInt(next);
definitionLevelWriter.writeInteger(next);
if (next != maxDefinitionLevel) {
currentPageNullCounts++;
}
currentPageRows++;
valueCount++;
}

// write repetition levels
Iterator<Integer> repIterator = getIterator(current.getRepLevelIterables());
while (repIterator.hasNext()) {
int next = repIterator.next();
repetitionLevelEncoder.writeInt(next);
if (next == 0) {
currentPageRowCount++;
}
repetitionLevelWriter.writeInteger(next);
}

if (getBufferedBytes() >= pageSizeThreshold) {
Expand Down Expand Up @@ -178,14 +172,14 @@ private ColumnMetaData getColumnMetaData()
encodings.stream().map(parquetMetadataConverter::getEncoding).collect(toImmutableList()),
ImmutableList.copyOf(columnDescriptor.getPath()),
compressionCodec.getParquetCompressionCodec(),
totalRows,
totalValues,
totalUnCompressedSize,
totalCompressedSize,
-1);
columnMetaData.setStatistics(ParquetMetadataConverter.toParquetStatistics(columnStatistics));
ImmutableList.Builder<PageEncodingStats> pageEncodingStats = ImmutableList.builder();
dataPagesWithEncoding.entrySet().stream()
.map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE_V2, encodingAndCount.getKey(), encodingAndCount.getValue()))
.map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue()))
.forEach(pageEncodingStats::add);
dictionaryPagesWithEncoding.entrySet().stream()
.map(encodingAndCount -> new PageEncodingStats(PageType.DICTIONARY_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue()))
Expand All @@ -203,76 +197,61 @@ private void flushCurrentPageToBuffer()
{
ImmutableList.Builder<ParquetDataOutput> outputDataStreams = ImmutableList.builder();

BytesInput bytes = primitiveValueWriter.getBytes();
ParquetDataOutput repetitions = createDataOutput(copy(repetitionLevelEncoder.toBytes()));
ParquetDataOutput definitions = createDataOutput(copy(definitionLevelEncoder.toBytes()));

// Add encoding should be called after primitiveValueWriter.getBytes() and before primitiveValueWriter.reset()
encodings.add(primitiveValueWriter.getEncoding());

long uncompressedSize = bytes.size() + repetitions.size() + definitions.size();

ParquetDataOutput data;
long compressedSize;
if (compressor != null) {
data = compressor.compress(bytes);
compressedSize = data.size() + repetitions.size() + definitions.size();
}
else {
data = createDataOutput(copy(bytes));
compressedSize = uncompressedSize;
}
BytesInput bytesInput = BytesInput.concat(copy(repetitionLevelWriter.getBytes()),
copy(definitionLevelWriter.getBytes()),
copy(primitiveValueWriter.getBytes()));
ParquetDataOutput pageData = (compressor != null) ? compressor.compress(bytesInput) : createDataOutput(bytesInput);
long uncompressedSize = bytesInput.size();
long compressedSize = pageData.size();

ByteArrayOutputStream pageHeaderOutputStream = new ByteArrayOutputStream();

Statistics<?> statistics = primitiveValueWriter.getStatistics();
statistics.incrementNumNulls(currentPageNullCounts);

columnStatistics.mergeStatistics(statistics);

parquetMetadataConverter.writeDataPageV2Header((int) uncompressedSize,
parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize,
(int) compressedSize,
currentPageRows,
currentPageNullCounts,
currentPageRowCount,
statistics,
valueCount,
repetitionLevelWriter.getEncoding(),
definitionLevelWriter.getEncoding(),
primitiveValueWriter.getEncoding(),
(int) repetitions.size(),
(int) definitions.size(),
pageHeaderOutputStream);

ParquetDataOutput pageHeader = createDataOutput(Slices.wrappedBuffer(pageHeaderOutputStream.toByteArray()));
outputDataStreams.add(pageHeader);
outputDataStreams.add(repetitions);
outputDataStreams.add(definitions);
outputDataStreams.add(data);
outputDataStreams.add(pageData);

List<ParquetDataOutput> dataOutputs = outputDataStreams.build();

dataPagesWithEncoding.merge(new ParquetMetadataConverter().getEncoding(primitiveValueWriter.getEncoding()), 1, Integer::sum);
dataPagesWithEncoding.merge(parquetMetadataConverter.getEncoding(primitiveValueWriter.getEncoding()), 1, Integer::sum);

// update total stats
totalCompressedSize += pageHeader.size() + compressedSize;
joshthoward marked this conversation as resolved.
Show resolved Hide resolved
totalUnCompressedSize += pageHeader.size() + uncompressedSize;
totalRows += currentPageRows;
totalCompressedSize += pageHeader.size() + compressedSize;
totalValues += valueCount;

pageBuffer.addAll(dataOutputs);

// Add encoding should be called after ValuesWriter#getBytes() and before ValuesWriter#reset()
encodings.add(repetitionLevelWriter.getEncoding());
encodings.add(definitionLevelWriter.getEncoding());
encodings.add(primitiveValueWriter.getEncoding());

// reset page stats
currentPageRows = 0;
valueCount = 0;
currentPageNullCounts = 0;
currentPageRowCount = 0;

definitionLevelEncoder.reset();
repetitionLevelEncoder.reset();
repetitionLevelWriter.reset();
definitionLevelWriter.reset();
primitiveValueWriter.reset();
}

private List<ParquetDataOutput> getDataStreams()
throws IOException
{
List<ParquetDataOutput> dictPage = new ArrayList<>();
if (currentPageRows > 0) {
if (valueCount > 0) {
flushCurrentPageToBuffer();
}
// write dict page if possible
Expand Down Expand Up @@ -314,8 +293,8 @@ private List<ParquetDataOutput> getDataStreams()
public long getBufferedBytes()
{
return pageBuffer.stream().mapToLong(ParquetDataOutput::size).sum() +
definitionLevelEncoder.getBufferedSize() +
repetitionLevelEncoder.getBufferedSize() +
definitionLevelWriter.getBufferedSize() +
repetitionLevelWriter.getBufferedSize() +
primitiveValueWriter.getBufferedSize();
}

Expand All @@ -324,22 +303,22 @@ public long getRetainedBytes()
{
return INSTANCE_SIZE +
primitiveValueWriter.getAllocatedSize() +
definitionLevelEncoder.getAllocatedSize() +
repetitionLevelEncoder.getAllocatedSize();
definitionLevelWriter.getAllocatedSize() +
repetitionLevelWriter.getAllocatedSize();
}

@Override
public void reset()
{
definitionLevelEncoder.reset();
repetitionLevelEncoder.reset();
definitionLevelWriter.reset();
repetitionLevelWriter.reset();
primitiveValueWriter.reset();
pageBuffer.clear();
closed = false;

totalCompressedSize = 0;
totalUnCompressedSize = 0;
totalRows = 0;
totalValues = 0;
encodings.clear();
dataPagesWithEncoding.clear();
dictionaryPagesWithEncoding.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.trino.tempto.Requirement;
import io.trino.tempto.RequirementsProvider;
import io.trino.tempto.configuration.Configuration;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;

Expand All @@ -32,7 +31,6 @@
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestHiveCompression
extends HiveProductTest
Expand Down Expand Up @@ -91,20 +89,7 @@ public void testSnappyCompressedParquetTableCreatedInTrino()
@Test(groups = HIVE_COMPRESSION)
public void testSnappyCompressedParquetTableCreatedInTrinoWithNativeWriter()
{
if (getHiveVersionMajor() >= 2) {
testSnappyCompressedParquetTableCreatedInTrino(true);
return;
}

// TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet writer creates files that cannot be read by Hive
assertThatThrownBy(() -> testSnappyCompressedParquetTableCreatedInTrino(true))
joshthoward marked this conversation as resolved.
Show resolved Hide resolved
.hasStackTraceContaining("at org.apache.hive.jdbc.HiveQueryResultSet.next") // comes via Hive JDBC
.extracting(Throwable::toString, InstanceOfAssertFactories.STRING)
// There are a few cases here each of which are downstream:
// - HDP 2 and CDH 5 cannot read Parquet V2 files and throw "org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file"
// - CDH 5 Parquet uses parquet.* packages, while HDP 2 uses org.apache.parquet.* packages
// - HDP 3 throws java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.hive.serde2.io.HiveVarcharWritable
.matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: java.io.IOException:\\E (org.apache.)?parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file .*");
testSnappyCompressedParquetTableCreatedInTrino(true);
}

private void testSnappyCompressedParquetTableCreatedInTrino(boolean optimizedParquetWriter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.tests.product.hive;

import io.trino.tempto.ProductTest;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -36,7 +35,6 @@
import static java.lang.String.join;
import static java.util.Collections.nCopies;
import static java.util.Locale.ENGLISH;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestHiveSparkCompatibility
extends ProductTest
Expand Down Expand Up @@ -215,11 +213,7 @@ public void testReadTrinoCreatedParquetTable()
public void testReadTrinoCreatedParquetTableWithNativeWriter()
{
onTrino().executeQuery("SET SESSION " + TRINO_CATALOG + ".experimental_parquet_optimized_writer_enabled = true");
// TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet Writer writes Parquet V2 files that are not compatible with Spark's vectorized reader, see https://github.com/trinodb/trino/issues/7953 for more details
joshthoward marked this conversation as resolved.
Show resolved Hide resolved
assertThatThrownBy(() -> testReadTrinoCreatedTable("using_native_parquet", "PARQUET"))
.hasStackTraceContaining("at org.apache.hive.jdbc.HiveStatement.execute")
.extracting(Throwable::toString, InstanceOfAssertFactories.STRING)
.matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: Error running query: java.lang.UnsupportedOperationException: Unsupported encoding: RLE\\E");
testReadTrinoCreatedTable("using_native_parquet", "PARQUET");
}

private void testReadTrinoCreatedTable(String tableName, String tableFormat)
Expand Down