Skip to content

Commit

Permalink
Test out Parquet 1.14.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed May 1, 2024
1 parent 10ffc60 commit 157079b
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 95 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ allprojects {
repositories {
mavenCentral()
mavenLocal()
maven {
url = uri("https://repository.apache.org/content/groups/staging")
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ netty-buffer = "4.1.109.Final"
netty-buffer-compat = "4.1.109.Final"
object-client-bundle = "3.3.2"
orc = "1.9.3"
parquet = "1.13.1"
parquet = "1.14.0"
pig = "0.17.0"
roaringbitmap = "1.0.6"
s3mock-junit5 = "2.11.0"
Expand Down
10 changes: 10 additions & 0 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.hadoop.ParquetFileReader;
Expand Down Expand Up @@ -366,6 +367,7 @@ public <D> FileAppender<D> build() throws IOException {
new ParquetWriteBuilder<D>(ParquetIO.file(file))
.withWriterVersion(writerVersion)
.setType(type)
.withConf(conf)
.setConfig(config)
.setKeyValueMetadata(metadata)
.setWriteSupport(getWriteSupport(type))
Expand Down Expand Up @@ -991,6 +993,14 @@ protected WriteSupport<T> getWriteSupport(Configuration configuration) {
}
return new ParquetWriteSupport<>(type, keyValueMetadata, writeSupport);
}

@Override
protected WriteSupport<T> getWriteSupport(ParquetConfiguration conf) {
for (Map.Entry<String, String> entry : config.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
return new ParquetWriteSupport<>(type, keyValueMetadata, writeSupport);
}
}

public static ReadBuilder read(InputFile file) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.parquet;

import static org.apache.parquet.avro.AvroReadSupport.AVRO_REQUESTED_PROJECTION;

import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -27,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.io.api.RecordMaterializer;
Expand Down Expand Up @@ -104,6 +107,60 @@ public ReadContext init(
projection, context != null ? context.getReadSupportMetadata() : ImmutableMap.of());
}

@Override
@SuppressWarnings("deprecation")
public ReadContext init(
ParquetConfiguration configuration,
Map<String, String> keyValueMetaData,
MessageType fileSchema) {
// Columns are selected from the Parquet file by taking the read context's message type and
// matching to the file's columns by full path, so this must select columns by using the path
// in the file's schema.

MessageType projection;
if (ParquetSchemaUtil.hasIds(fileSchema)) {
projection = ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema);
} else if (nameMapping != null) {
MessageType typeWithIds = ParquetSchemaUtil.applyNameMapping(fileSchema, nameMapping);
projection = ParquetSchemaUtil.pruneColumns(typeWithIds, expectedSchema);
} else {
projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
}

// override some known backward-compatibility options
configuration.set("parquet.strict.typing", "false");
configuration.set("parquet.avro.add-list-element-records", "false");
configuration.set("parquet.avro.write-old-list-structure", "false");

// set Avro schemas in case the reader is Avro
configuration.set(
AVRO_REQUESTED_PROJECTION,
AvroSchemaUtil.convert(expectedSchema, projection.getName()).toString());
org.apache.avro.Schema avroReadSchema =
AvroSchemaUtil.buildAvroProjection(
AvroSchemaUtil.convert(ParquetSchemaUtil.convert(projection), projection.getName()),
expectedSchema,
ImmutableMap.of());
configuration.set(
"parquet.avro.read.schema", ParquetAvro.parquetAvroSchema(avroReadSchema).toString());

// let the context set up read support metadata, but always use the correct projection
ReadContext context = null;
if (callInit) {
try {
context = wrapped.init(configuration, keyValueMetaData, projection);
} catch (UnsupportedOperationException e) {
// try the InitContext version
context =
wrapped.init(
new InitContext(configuration, makeMultimap(keyValueMetaData), projection));
}
}

return new ReadContext(
projection, context != null ? context.getReadSupportMetadata() : ImmutableMap.of());
}

@Override
public RecordMaterializer<T> prepareForRead(
Configuration configuration,
Expand All @@ -119,6 +176,21 @@ public RecordMaterializer<T> prepareForRead(
return wrapped.prepareForRead(configuration, fileMetadata, readSchema, readContext);
}

@Override
public RecordMaterializer<T> prepareForRead(
ParquetConfiguration configuration,
Map<String, String> fileMetadata,
MessageType fileMessageType,
ReadContext readContext) {
// This is the type created in init that was based on the file's schema. The schema that this
// will pass to the wrapped ReadSupport needs to match the expected schema's names. Rather than
// renaming the file's schema, convert the expected schema to Parquet. This relies on writing
// files with the correct schema.
// TODO: this breaks when columns are reordered.
MessageType readSchema = ParquetSchemaUtil.convert(expectedSchema, fileMessageType.getName());
return wrapped.prepareForRead(configuration, fileMetadata, readSchema, readContext);
}

private Map<String, Set<String>> makeMultimap(Map<String, String> map) {
ImmutableMap.Builder<String, Set<String>> builder = ImmutableMap.builder();
for (Map.Entry<String, String> entry : map.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -48,6 +49,17 @@ public WriteContext init(Configuration configuration) {
return new WriteContext(type, metadata);
}

@Override
public WriteContext init(ParquetConfiguration configuration) {
WriteContext wrappedContext = wrapped.init(configuration);
Map<String, String> metadata =
ImmutableMap.<String, String>builder()
.putAll(keyValueMetadata)
.putAll(wrappedContext.getExtraMetaData())
.buildOrThrow();
return new WriteContext(type, metadata);
}

@Override
public String getName() {
return "Iceberg/" + wrapped.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
this.targetRowGroupSize = rowGroupSize;
this.props = properties;
this.metadata = ImmutableMap.copyOf(metadata);
this.compressor =
new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec);
this.compressor = new CodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec);
this.parquetSchema = ParquetSchemaUtil.convert(schema, "table");
this.model = (ParquetValueWriter<T>) createWriterFunc.apply(parquetSchema);
this.metricsConfig = metricsConfig;
Expand Down

0 comments on commit 157079b

Please sign in to comment.