Skip to content

Commit

Permalink
Reset all PrimitiveColumnWriter fields when writer is reset
Browse files Browse the repository at this point in the history
Fixes issue trinodb#5518
  • Loading branch information
alexjo2144 authored and sumannewton committed Jan 15, 2022
1 parent 0244efd commit c1c6ba8
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ public long getRetainedBytes()
@Override
public void reset()
{
definitionLevelEncoder.reset();
repetitionLevelEncoder.reset();
primitiveValueWriter.reset();
pageBuffer.clear();
closed = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import io.airlift.units.DataSize;
import io.trino.plugin.hive.HiveTimestampPrecision;
import io.trino.tempto.ProductTest;
import io.trino.tempto.assertions.QueryAssert.Row;
import io.trino.tempto.query.QueryExecutor.QueryParam;
import io.trino.tempto.query.QueryResult;
import io.trino.testng.services.Flaky;
import io.trino.tests.product.utils.JdbcDriverUtils;
import org.apache.parquet.hadoop.ParquetWriter;
import org.assertj.core.api.SoftAssertions;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -588,6 +590,49 @@ public void testStructTimestampsFromTrino(StorageFormat format)
onTrino().executeQuery(format("DROP TABLE %s", tableName));
}

// These are regression tests for issue: https://github.com/trinodb/trino/issues/5518
// The Parquet session properties are set to ensure that the correct situations in the Parquet writer are met to replicate the bug.
// Not included in the STORAGE_FORMATS group since they require a large insert, which takes some time.
@Test
public void testLargeParquetInsert()
{
DataSize reducedRowGroupSize = DataSize.ofBytes(ParquetWriter.DEFAULT_PAGE_SIZE / 4);
runLargeInsert(storageFormat(
"PARQUET",
ImmutableMap.of(
"hive.parquet_writer_page_size", reducedRowGroupSize.toBytesValueString(),
"task_writer_count", "1")));
}

@Test
public void testLargeParquetInsertWithNativeWriter()
{
DataSize reducedRowGroupSize = DataSize.ofBytes(ParquetWriter.DEFAULT_PAGE_SIZE / 4);
runLargeInsert(storageFormat(
"PARQUET",
ImmutableMap.of(
"hive.experimental_parquet_optimized_writer_enabled", "true",
"hive.parquet_writer_page_size", reducedRowGroupSize.toBytesValueString(),
"task_writer_count", "1")));
}

@Test
public void testLargeOrcInsert()
{
runLargeInsert(storageFormat("ORC", ImmutableMap.of("hive.orc_optimized_writer_validate", "true")));
}

private void runLargeInsert(StorageFormat storageFormat)
{
String tableName = "test_large_insert_" + storageFormat.getName() + randomTableSuffix();
setSessionProperties(storageFormat);
query("CREATE TABLE " + tableName + " WITH (" + storageFormat.getStoragePropertiesAsSql() + ") AS SELECT * FROM tpch.sf1.lineitem WHERE false");
query("INSERT INTO " + tableName + " SELECT * FROM tpch.sf1.lineitem");

assertThat(query("SELECT count(*) FROM " + tableName)).containsOnly(row(6001215L));
onTrino().executeQuery("DROP TABLE " + tableName);
}

private String createSimpleTimestampTable(String tableNamePrefix, StorageFormat format)
{
return createTestTable(tableNamePrefix, format, "(id BIGINT, ts TIMESTAMP)");
Expand Down

0 comments on commit c1c6ba8

Please sign in to comment.