Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reinitialize the column writers when flushing a row group #10418

Merged
merged 2 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,4 @@ public long getRetainedBytes()
{
return INSTANCE_SIZE + elementWriter.getRetainedBytes();
}

@Override
public void reset()
{
elementWriter.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ List<BufferData> getBuffer()

long getRetainedBytes();

void reset();

class BufferData
{
private final ColumnMetaData metaData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,4 @@ public long getRetainedBytes()
{
return INSTANCE_SIZE + keyWriter.getRetainedBytes() + valueWriter.getRetainedBytes();
}

@Override
public void reset()
{
keyWriter.reset();
valueWriter.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,17 @@ public class ParquetWriter

private static final int CHUNK_MAX_BYTES = toIntExact(DataSize.of(128, MEGABYTE).toBytes());

private final List<ColumnWriter> columnWriters;
private final OutputStreamSliceOutput outputStream;
private final ParquetWriterOptions writerOption;
private final MessageType messageType;
private final String createdBy;
private final int chunkMaxLogicalBytes;
private final Map<List<String>, Type> primitiveTypes;
private final CompressionCodecName compressionCodecName;

private final ImmutableList.Builder<RowGroup> rowGroupBuilder = ImmutableList.builder();

private List<ColumnWriter> columnWriters;
private int rows;
private long bufferedBytes;
private boolean closed;
Expand All @@ -85,17 +87,10 @@ public ParquetWriter(
{
this.outputStream = new OutputStreamSliceOutput(requireNonNull(outputStream, "outputstream is null"));
this.messageType = requireNonNull(messageType, "messageType is null");
requireNonNull(primitiveTypes, "primitiveTypes is null");
this.primitiveTypes = requireNonNull(primitiveTypes, "primitiveTypes is null");
this.writerOption = requireNonNull(writerOption, "writerOption is null");
requireNonNull(compressionCodecName, "compressionCodecName is null");

ParquetProperties parquetProperties = ParquetProperties.builder()
.withWriterVersion(PARQUET_1_0)
.withPageSize(writerOption.getMaxPageSize())
.build();

this.columnWriters = ParquetWriters.getColumnWriters(messageType, primitiveTypes, parquetProperties, compressionCodecName);

this.compressionCodecName = requireNonNull(compressionCodecName, "compressionCodecName is null");
initColumnWriters();
this.chunkMaxLogicalBytes = max(1, CHUNK_MAX_BYTES / 2);
this.createdBy = formatCreatedBy(requireNonNull(trinoVersion, "trinoVersion is null"));
}
Expand Down Expand Up @@ -164,7 +159,7 @@ private void writeChunk(Page page)
if (bufferedBytes >= writerOption.getMaxRowGroupSize()) {
columnWriters.forEach(ColumnWriter::close);
flush();
columnWriters.forEach(ColumnWriter::reset);
initColumnWriters();
Copy link
Contributor Author

@findinpath findinpath Dec 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bugfix follows the same strategy as in parquet-mr project:

https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java#L157-L159

After the row group has been flushed, the column writers get reinitialized from scratch.

The FallbackValuesWriter doesn't reset both of its writers within the #reset() method and this behavior leads to the unwanted behavior documented in #5518 (comment)

One concern which I have with this approach is that the reinitialisation of the writers may affect the performance. I'd need in this regard feedback.

Copy link
Contributor Author

@findinpath findinpath Dec 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comparing https://github.com/trinodb/trino/runs/4650149952?check_suite_focus=true (a test run of this branch) and
https://github.com/trinodb/trino/runs/4649720577?check_suite_focus=true (a build on master) on TestFullParquetReader i see differences of about 20% in favor of the master implementation compared to the current bugfix implementation (for thetrino-hive, trino-parquet tests).

I ran again the tests https://github.com/trinodb/trino/runs/4652707273?check_suite_focus=true and found out no major difference between the runs. Actually in most of the cases, the tests were running faster on the branch bugfix/parquet-9749 than on master

The test TestFullParquetReader.testNestedStructs took 1m / 2m on the bugfix branch and 8 seconds on master. I'm not sure whether this is relevant or is it linked to the github build runner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FallbackValuesWriter doesn't reset both of its writers within the #reset() method and this behavior leads to the unwanted behavior documented in #5518 (comment)

When does the FallbackValuesWriter get used?

One concern which I have with this approach is that the reinitialisation of the writers may affect the performance. I'd need in this regard feedback.

That should not be much of a problem. It happens once per row group and it doesn't seem to be a very expensive operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does the FallbackValuesWriter get used?

It gets used as a fallback for the situation where the dictionary encoding grows too big.

https://github.com/apache/parquet-format/blob/master/Encodings.md#dictionary-encoding-plain_dictionary--2-and-rle_dictionary--8

rows = 0;
bufferedBytes = columnWriters.stream().mapToLong(ColumnWriter::getBufferedBytes).sum();
}
Expand Down Expand Up @@ -289,4 +284,14 @@ static String formatCreatedBy(String trinoVersion)
// Add "(build n/a)" suffix to satisfy Parquet's VersionParser expectations
return "Trino version " + trinoVersion + " (build n/a)";
}

private void initColumnWriters()
{
ParquetProperties parquetProperties = ParquetProperties.builder()
.withWriterVersion(PARQUET_1_0)
.withPageSize(writerOption.getMaxPageSize())
.build();

this.columnWriters = ParquetWriters.getColumnWriters(messageType, primitiveTypes, parquetProperties, compressionCodecName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,24 +306,4 @@ public long getRetainedBytes()
definitionLevelWriter.getAllocatedSize() +
repetitionLevelWriter.getAllocatedSize();
}

@Override
public void reset()
{
definitionLevelWriter.reset();
repetitionLevelWriter.reset();
primitiveValueWriter.reset();
pageBuffer.clear();
closed = false;

totalCompressedSize = 0;
totalUnCompressedSize = 0;
totalValues = 0;
encodings.clear();
dataPagesWithEncoding.clear();
dictionaryPagesWithEncoding.clear();
this.columnStatistics = Statistics.createStats(columnDescriptor.getPrimitiveType());

getDataStreamsCalled = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,4 @@ public long getRetainedBytes()
return INSTANCE_SIZE +
columnWriters.stream().mapToLong(ColumnWriter::getRetainedBytes).sum();
}

@Override
public void reset()
{
columnWriters.forEach(ColumnWriter::reset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
package io.trino.plugin.iceberg;

import io.trino.Session;
import io.trino.testing.MaterializedResult;
import io.trino.testing.sql.TestTable;
import org.testng.annotations.Test;

import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.iceberg.FileFormat.PARQUET;
import static org.testng.Assert.assertEquals;

public class TestIcebergParquetConnectorTest
extends BaseIcebergConnectorTest
Expand All @@ -39,6 +46,24 @@ protected boolean supportsRowGroupStatistics(String typeName)
typeName.equalsIgnoreCase("timestamp(6) with time zone"));
}

@Test
public void testRowGroupResetDictionary()
{
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_row_group_reset_dictionary",
"(plain_col varchar, dict_col int)")) {
String tableName = table.getName();
String values = IntStream.range(0, 100)
.mapToObj(i -> "('ABCDEFGHIJ" + i + "' , " + (i < 20 ? "1" : "null") + ")")
.collect(Collectors.joining(", "));
assertUpdate(withSmallRowGroups(getSession()), "INSERT INTO " + tableName + " VALUES " + values, 100);
findinpath marked this conversation as resolved.
Show resolved Hide resolved

MaterializedResult result = getDistributedQueryRunner().execute(String.format("SELECT * FROM %s", tableName));
assertEquals(result.getRowCount(), 100);
}
}

@Override
protected Session withSmallRowGroups(Session session)
{
Expand Down