Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of parquet_writer_version, 1.0 and 2.0 #20209

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.apache.parquet.column.ParquetProperties.WriterVersion;

import javax.inject.Inject;

Expand Down Expand Up @@ -84,6 +85,7 @@ public final class HiveSessionProperties
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String PARQUET_WRITER_VERSION = "parquet_writer_version";
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "parquet_optimized_writer_enabled";
private static final String MAX_SPLIT_SIZE = "max_split_size";
private static final String MAX_INITIAL_SPLIT_SIZE = "max_initial_split_size";
Expand Down Expand Up @@ -150,8 +152,7 @@ public final class HiveSessionProperties
@Inject
public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterConfig orcFileWriterConfig, ParquetFileWriterConfig parquetFileWriterConfig, CacheConfig cacheConfig)
{
sessionProperties = ImmutableList.of(
booleanProperty(
sessionProperties = ImmutableList.of(booleanProperty(
IGNORE_TABLE_BUCKETING,
"Ignore table bucketing to enable reading from unbucketed partitions",
hiveClientConfig.isIgnoreTableBucketing(),
Expand Down Expand Up @@ -349,6 +350,15 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Parquet: Maximum size of a block to read",
hiveClientConfig.getParquetMaxReadBlockSize(),
false),
new PropertyMetadata<>(
PARQUET_WRITER_VERSION,
"Parquet: Writer format version",
VARCHAR,
WriterVersion.class,
parquetFileWriterConfig.getFormatVersion(),
false,
value -> WriterVersion.valueOf((String) value),
WriterVersion::toString),
dataSizeSessionProperty(
PARQUET_WRITER_BLOCK_SIZE,
"Parquet: Writer block size",
Expand Down Expand Up @@ -899,6 +909,11 @@ public static DataSize getParquetMaxReadBlockSize(ConnectorSession session)
return session.getProperty(PARQUET_MAX_READ_BLOCK_SIZE, DataSize.class);
}

public static WriterVersion getParquetWriterVersion(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_VERSION, WriterVersion.class);
}

public static DataSize getParquetWriterBlockSize(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_BLOCK_SIZE, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@

import com.facebook.airlift.configuration.Config;
import io.airlift.units.DataSize;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.hadoop.ParquetWriter;

import static io.airlift.units.DataSize.Unit.BYTE;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;

public class ParquetFileWriterConfig
{
private boolean parquetOptimizedWriterEnabled;

private WriterVersion formatVersion = PARQUET_2_0;
private DataSize blockSize = new DataSize(ParquetWriter.DEFAULT_BLOCK_SIZE, BYTE);
private DataSize pageSize = new DataSize(ParquetWriter.DEFAULT_PAGE_SIZE, BYTE);

public WriterVersion getFormatVersion()
{
return formatVersion;
}

public DataSize getBlockSize()
{
return blockSize;
Expand All @@ -43,6 +51,13 @@ public DataSize getPageSize()
return pageSize;
}

@Config("hive.parquet.writer.format-version")
public ParquetFileWriterConfig setFormatVersion(WriterVersion formatVersion)
{
this.formatVersion = formatVersion;
return this;
}

@Config("hive.parquet.writer.page-size")
public ParquetFileWriterConfig setPageSize(DataSize pageSize)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTimeZone;
Expand All @@ -45,6 +46,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR;
import static com.facebook.presto.hive.HiveSessionProperties.getParquetWriterBlockSize;
import static com.facebook.presto.hive.HiveSessionProperties.getParquetWriterPageSize;
import static com.facebook.presto.hive.HiveSessionProperties.getParquetWriterVersion;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetOptimizedWriterEnabled;
import static com.facebook.presto.hive.HiveType.toHiveTypes;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -101,6 +103,7 @@ public Optional<HiveFileWriter> createFileWriter(
}

ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder()
.setWriterVersion((ParquetProperties.WriterVersion) getParquetWriterVersion(session))
.setMaxPageSize(getParquetWriterPageSize(session))
.setMaxBlockSize(getParquetWriterBlockSize(session))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static com.facebook.presto.iceberg.IcebergSessionProperties.getOrcMaxMergeDistance;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getOrcOptimizedWriterValidateMode;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getOrcStreamBufferSize;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getParquetWriterVersion;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isOrcOptimizedWriterValidate;
import static com.facebook.presto.iceberg.TypeConverter.toOrcType;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
Expand Down Expand Up @@ -140,6 +141,7 @@ private IcebergFileWriter createParquetWriter(
};

ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder()
.setWriterVersion(getParquetWriterVersion(session))
.setMaxPageSize(getParquetWriterPageSize(session))
.setMaxBlockSize(getParquetWriterBlockSize(session))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.facebook.presto.spi.session.PropertyMetadata;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import org.apache.parquet.column.ParquetProperties.WriterVersion;

import javax.inject.Inject;

Expand All @@ -50,6 +51,7 @@ public final class IcebergSessionProperties
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String PARQUET_WRITER_VERSION = "parquet_writer_version";
private static final String PARQUET_USE_COLUMN_NAMES = "parquet_use_column_names";
private static final String PARQUET_BATCH_READ_OPTIMIZATION_ENABLED = "parquet_batch_read_optimization_enabled";
private static final String PARQUET_BATCH_READER_VERIFICATION_ENABLED = "parquet_batch_reader_verification_enabled";
Expand Down Expand Up @@ -119,6 +121,15 @@ public IcebergSessionProperties(
"Parquet: Maximum size of a block to read",
hiveClientConfig.getParquetMaxReadBlockSize(),
false),
new PropertyMetadata<>(
PARQUET_WRITER_VERSION,
"Parquet: Writer format version",
VARCHAR,
WriterVersion.class,
parquetFileWriterConfig.getFormatVersion(),
false,
value -> WriterVersion.valueOf((String) value),
WriterVersion::toString),
dataSizeSessionProperty(
PARQUET_WRITER_BLOCK_SIZE,
"Parquet: Writer block size",
Expand Down Expand Up @@ -301,6 +312,11 @@ public static DataSize getParquetWriterBlockSize(ConnectorSession session)
return session.getProperty(PARQUET_WRITER_PAGE_SIZE, DataSize.class);
}

public static WriterVersion getParquetWriterVersion(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_VERSION, WriterVersion.class);
}

public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, String description, DataSize defaultValue, boolean hidden)
{
return new PropertyMetadata<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import static java.lang.Math.toIntExact;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.BROTLI;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4;
Expand Down Expand Up @@ -100,9 +99,10 @@ public ParquetWriter(OutputStream outputStream,
this.messageType = requireNonNull(messageType, "messageType is null");

ParquetProperties parquetProperties = ParquetProperties.builder()
.withWriterVersion(PARQUET_2_0)
.withWriterVersion(writerOption.getWriterVersion())
.withPageSize(writerOption.getMaxPageSize())
.build();

CompressionCodecName compressionCodecName = getCompressionCodecName(compressionCodecClass);
this.columnWriters = ParquetWriters.getColumnWriters(messageType, primitiveTypes, parquetProperties, compressionCodecName);

Expand Down Expand Up @@ -253,6 +253,7 @@ static Slice getFooter(List<RowGroup> rowGroups, MessageType messageType)
long totalRows = rowGroups.stream().mapToLong(RowGroup::getNum_rows).sum();
fileMetaData.setNum_rows(totalRows);
fileMetaData.setRow_groups(ImmutableList.copyOf(rowGroups));
fileMetaData.setCreated_by("Presto");

DynamicSliceOutput dynamicSliceOutput = new DynamicSliceOutput(40);
Util.writeFileMetaData(fileMetaData, dynamicSliceOutput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package com.facebook.presto.parquet.writer;

import io.airlift.units.DataSize;
import org.apache.parquet.column.ParquetProperties.WriterVersion;

import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

public class ParquetWriterOptions
{
private static final WriterVersion DEFAULT_FORMAT_VERSION = WriterVersion.PARQUET_2_0;
private static final DataSize DEFAULT_MAX_ROW_GROUP_SIZE = DataSize.valueOf("128MB");
private static final DataSize DEFAULT_MAX_PAGE_SIZE = DataSize.valueOf("1MB");

Expand All @@ -28,15 +30,22 @@ public static ParquetWriterOptions.Builder builder()
return new ParquetWriterOptions.Builder();
}

private final WriterVersion writerVersion;
private final int maxRowGroupSize;
private final int maxPageSize;

private ParquetWriterOptions(DataSize maxRowGroupSize, DataSize maxPageSize)
private ParquetWriterOptions(WriterVersion writerVersion, DataSize maxRowGroupSize, DataSize maxPageSize)
{
this.writerVersion = writerVersion;
this.maxRowGroupSize = toIntExact(requireNonNull(maxRowGroupSize, "maxRowGroupSize is null").toBytes());
this.maxPageSize = toIntExact(requireNonNull(maxPageSize, "maxPageSize is null").toBytes());
}

public WriterVersion getWriterVersion()
{
return writerVersion;
}

public int getMaxRowGroupSize()
{
return maxRowGroupSize;
Expand All @@ -49,9 +58,16 @@ public int getMaxPageSize()

public static class Builder
{
private WriterVersion writerVersion = DEFAULT_FORMAT_VERSION;
private DataSize maxBlockSize = DEFAULT_MAX_ROW_GROUP_SIZE;
private DataSize maxPageSize = DEFAULT_MAX_PAGE_SIZE;

public Builder setWriterVersion(WriterVersion formatVersion)
{
this.writerVersion = formatVersion;
return this;
}

public Builder setMaxBlockSize(DataSize maxBlockSize)
{
this.maxBlockSize = maxBlockSize;
Expand All @@ -66,7 +82,7 @@ public Builder setMaxPageSize(DataSize maxPageSize)

public ParquetWriterOptions build()
{
return new ParquetWriterOptions(maxBlockSize, maxPageSize);
return new ParquetWriterOptions(writerVersion, maxBlockSize, maxPageSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.ImmutableList;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.GroupType;
Expand Down Expand Up @@ -130,6 +131,15 @@ public ColumnWriter primitive(PrimitiveType primitive)
int fieldRepetitionLevel = type.getMaxRepetitionLevel(path);
ColumnDescriptor columnDescriptor = new ColumnDescriptor(path, primitive, fieldRepetitionLevel, fieldDefinitionLevel);
Type prestoType = requireNonNull(prestoTypes.get(ImmutableList.copyOf(path)), " presto type is null");
if (this.parquetProperties.getWriterVersion().equals(WriterVersion.PARQUET_1_0)) {
return new PrimitiveColumnWriterV1(prestoType,
columnDescriptor,
getValueWriter(parquetProperties.newValuesWriter(columnDescriptor), prestoType, columnDescriptor.getPrimitiveType()),
parquetProperties.newDefinitionLevelEncoder(columnDescriptor),
parquetProperties.newRepetitionLevelEncoder(columnDescriptor),
compressionCodecName,
parquetProperties.getPageSizeThreshold());
}
return new PrimitiveColumnWriter(prestoType,
columnDescriptor,
getValueWriter(parquetProperties.newValuesWriter(columnDescriptor), prestoType, columnDescriptor.getPrimitiveType()),
Expand Down
Loading