diff --git a/build.gradle b/build.gradle index b30e4550cc60..0eb21513ea01 100644 --- a/build.gradle +++ b/build.gradle @@ -128,6 +128,9 @@ allprojects { repositories { mavenCentral() mavenLocal() + maven { + url = uri("https://repository.apache.org/content/groups/staging") + } } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index b561f66e0c34..776a4ea15d3d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -70,7 +70,7 @@ netty-buffer = "4.1.109.Final" netty-buffer-compat = "4.1.109.Final" object-client-bundle = "3.3.2" orc = "1.9.3" -parquet = "1.13.1" +parquet = "1.14.0" pig = "0.17.0" roaringbitmap = "1.0.6" s3mock-junit5 = "2.11.0" diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index d591041d19c3..f16c739bad10 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -103,6 +103,7 @@ import org.apache.parquet.avro.AvroWriteSupport; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.crypto.FileEncryptionProperties; import org.apache.parquet.hadoop.ParquetFileReader; @@ -366,6 +367,7 @@ public FileAppender build() throws IOException { new ParquetWriteBuilder(ParquetIO.file(file)) .withWriterVersion(writerVersion) .setType(type) + .withConf(conf) .setConfig(config) .setKeyValueMetadata(metadata) .setWriteSupport(getWriteSupport(type)) @@ -991,6 +993,14 @@ protected WriteSupport getWriteSupport(Configuration configuration) { } return new ParquetWriteSupport<>(type, keyValueMetadata, writeSupport); } + + @Override + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { + for (Map.Entry entry : config.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + return new ParquetWriteSupport<>(type, keyValueMetadata, writeSupport); + } } public static ReadBuilder read(InputFile file) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java deleted file mode 100644 index bfcece6259a6..000000000000 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.parquet.hadoop.BadConfigurationException; -import org.apache.parquet.hadoop.CodecFactory; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; - -/** - * This class implements a codec factory that is used when reading from Parquet. It adds a - * workaround to cache codecs by name and level, not just by name. This can be removed when this - * change is made to Parquet. - */ -public class ParquetCodecFactory extends CodecFactory { - - public ParquetCodecFactory(Configuration configuration, int pageSize) { - super(configuration, pageSize); - } - - /** - * This is copied from {@link CodecFactory} and modified to include the level in the cache key. - */ - @Override - protected CompressionCodec getCodec(CompressionCodecName codecName) { - String codecClassName = codecName.getHadoopCompressionCodecClassName(); - if (codecClassName == null) { - return null; - } - String cacheKey = cacheKey(codecName); - CompressionCodec codec = CODEC_BY_NAME.get(cacheKey); - if (codec != null) { - return codec; - } - - try { - Class codecClass; - try { - codecClass = Class.forName(codecClassName); - } catch (ClassNotFoundException e) { - // Try to load the class using the job classloader - codecClass = configuration.getClassLoader().loadClass(codecClassName); - } - codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration); - CODEC_BY_NAME.put(cacheKey, codec); - return codec; - } catch (ClassNotFoundException e) { - throw new BadConfigurationException("Class " + codecClassName + " was not found", e); - } - } - - private String cacheKey(CompressionCodecName codecName) { - String level = null; - switch (codecName) { - case GZIP: - level = configuration.get("zlib.compress.level"); - break; - case BROTLI: - level = configuration.get("compression.brotli.quality"); - break; - case ZSTD: - level = configuration.get("parquet.compression.codec.zstd.level"); - if (level == null) { - // keep "io.compression.codec.zstd.level" for backwards compatibility - level = configuration.get("io.compression.codec.zstd.level"); - } - break; - default: - // compression level is not supported; ignore it - } - String codecClass = codecName.getHadoopCompressionCodecClassName(); - return level == null ? codecClass : codecClass + ":" + level; - } -} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java index b1172147f80a..3351245c69ef 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.parquet; +import static org.apache.parquet.avro.AvroReadSupport.AVRO_REQUESTED_PROJECTION; + import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -27,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.io.api.RecordMaterializer; @@ -104,6 +107,60 @@ public ReadContext init( projection, context != null ? context.getReadSupportMetadata() : ImmutableMap.of()); } + @Override + @SuppressWarnings("deprecation") + public ReadContext init( + ParquetConfiguration configuration, + Map keyValueMetaData, + MessageType fileSchema) { + // Columns are selected from the Parquet file by taking the read context's message type and + // matching to the file's columns by full path, so this must select columns by using the path + // in the file's schema. + + MessageType projection; + if (ParquetSchemaUtil.hasIds(fileSchema)) { + projection = ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema); + } else if (nameMapping != null) { + MessageType typeWithIds = ParquetSchemaUtil.applyNameMapping(fileSchema, nameMapping); + projection = ParquetSchemaUtil.pruneColumns(typeWithIds, expectedSchema); + } else { + projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema); + } + + // override some known backward-compatibility options + configuration.set("parquet.strict.typing", "false"); + configuration.set("parquet.avro.add-list-element-records", "false"); + configuration.set("parquet.avro.write-old-list-structure", "false"); + + // set Avro schemas in case the reader is Avro + configuration.set( + AVRO_REQUESTED_PROJECTION, + AvroSchemaUtil.convert(expectedSchema, projection.getName()).toString()); + org.apache.avro.Schema avroReadSchema = + AvroSchemaUtil.buildAvroProjection( + AvroSchemaUtil.convert(ParquetSchemaUtil.convert(projection), projection.getName()), + expectedSchema, + ImmutableMap.of()); + configuration.set( + "parquet.avro.read.schema", ParquetAvro.parquetAvroSchema(avroReadSchema).toString()); + + // let the context set up read support metadata, but always use the correct projection + ReadContext context = null; + if (callInit) { + try { + context = wrapped.init(configuration, keyValueMetaData, projection); + } catch (UnsupportedOperationException e) { + // try the InitContext version + context = + wrapped.init( + new InitContext(configuration, makeMultimap(keyValueMetaData), projection)); + } + } + + return new ReadContext( + projection, context != null ? context.getReadSupportMetadata() : ImmutableMap.of()); + } + @Override public RecordMaterializer prepareForRead( Configuration configuration, @@ -119,6 +176,21 @@ public RecordMaterializer prepareForRead( return wrapped.prepareForRead(configuration, fileMetadata, readSchema, readContext); } + @Override + public RecordMaterializer prepareForRead( + ParquetConfiguration configuration, + Map fileMetadata, + MessageType fileMessageType, + ReadContext readContext) { + // This is the type created in init that was based on the file's schema. The schema that this + // will pass to the wrapped ReadSupport needs to match the expected schema's names. Rather than + // renaming the file's schema, convert the expected schema to Parquet. This relies on writing + // files with the correct schema. + // TODO: this breaks when columns are reordered. + MessageType readSchema = ParquetSchemaUtil.convert(expectedSchema, fileMessageType.getName()); + return wrapped.prepareForRead(configuration, fileMetadata, readSchema, readContext); + } + private Map> makeMultimap(Map map) { ImmutableMap.Builder> builder = ImmutableMap.builder(); for (Map.Entry entry : map.entrySet()) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java index 985d0c51c609..f830a553fad9 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; @@ -48,6 +49,17 @@ public WriteContext init(Configuration configuration) { return new WriteContext(type, metadata); } + @Override + public WriteContext init(ParquetConfiguration configuration) { + WriteContext wrappedContext = wrapped.init(configuration); + Map metadata = + ImmutableMap.builder() + .putAll(keyValueMetadata) + .putAll(wrappedContext.getExtraMetaData()) + .buildOrThrow(); + return new WriteContext(type, metadata); + } + @Override public String getName() { return "Iceberg/" + wrapped.getName(); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 099cffc33bb8..577004993711 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -86,8 +86,7 @@ class ParquetWriter implements FileAppender, Closeable { this.targetRowGroupSize = rowGroupSize; this.props = properties; this.metadata = ImmutableMap.copyOf(metadata); - this.compressor = - new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); + this.compressor = new CodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); this.parquetSchema = ParquetSchemaUtil.convert(schema, "table"); this.model = (ParquetValueWriter) createWriterFunc.apply(parquetSchema); this.metricsConfig = metricsConfig;