diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 18c63fbe7bb1..7a44456823f6 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1171,6 +1171,15 @@ acceptedBreaks: \ java.util.function.Function, org.apache.iceberg.io.CloseableIterable,\ \ java.util.function.Consumer)" justification: "Removing deprecated code" + org.apache.iceberg:iceberg-data: + - code: "java.class.removed" + old: "class org.apache.iceberg.data.BaseDeleteLoader" + justification: "Moved from iceberg-data to iceberg-core, so this should not\ + \ cause issues" + - code: "java.class.removed" + old: "class org.apache.iceberg.data.InternalRecordWrapper" + justification: "Moved from iceberg-data to iceberg-core, so this should not\ + \ cause issues" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java index 7bd744ec5bf3..3cba5cea7f84 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java @@ -33,16 +33,22 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFileReaderService; +import org.apache.iceberg.DataFileReaderServiceRegistry; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedInputFile; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileFormatReadBuilder; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMappingParser; @@ -322,16 +328,10 @@ CloseableIterator open(FileScanTask task) { InputFile location = getInputFile(task); Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); if (task.file().format() == FileFormat.PARQUET) { - Parquet.ReadBuilder builder = - Parquet.read(location) - .project(expectedSchema) + FileFormatReadBuilder builder = + DataFileReaderServiceRegistry.read( + FileFormat.PARQUET, ColumnarBatch.class, location, expectedSchema) .split(task.start(), task.length()) - .createBatchedReaderFunc( - fileSchema -> - buildReader( - expectedSchema, - fileSchema, /* setArrowValidityVector */ - NullCheckingForGet.NULL_CHECKING_ENABLED)) .recordsPerBatch(batchSize) .filter(task.residual()) .caseSensitive(caseSensitive); @@ -388,4 +388,33 @@ private static ArrowBatchReader buildReader( ArrowBatchReader::new)); } } + + public static class ReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.PARQUET; + } + + @Override + public Class returnType() { + return ColumnarBatch.class; + } + + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + return Parquet.read(inputFile) + .project(readSchema) + .createBatchedReaderFunc( + fileSchema -> + VectorizedCombinedScanIterator.buildReader( + readSchema, + fileSchema, /* setArrowValidityVector */ + NullCheckingForGet.NULL_CHECKING_ENABLED)); + } + } } diff --git a/arrow/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/arrow/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..97cc384dce2d --- /dev/null +++ b/arrow/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.arrow.vectorized.ArrowReader$ReaderService diff --git a/arrow/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/arrow/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..97cc384dce2d --- /dev/null +++ b/arrow/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.arrow.vectorized.ArrowReader$ReaderService diff --git a/core/src/main/java/org/apache/iceberg/DataFileReaderService.java b/core/src/main/java/org/apache/iceberg/DataFileReaderService.java new file mode 100644 index 000000000000..1e535c4cb583 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/DataFileReaderService.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.io.FileFormatReadBuilder; +import org.apache.iceberg.io.InputFile; + +/** + * Service building readers. Implementations should be registered through the {@link + * java.util.ServiceLoader}. {@link DataFileReaderServiceRegistry} is used to collect and serve the + * reader implementations. + */ +public interface DataFileReaderService { + /** + * Returns the file format which is read by the readers. + * + * @return the input format of the reader + */ + FileFormat format(); + + /** + * Returns the return type which is generated by the readers. + * + * @return the return type of the reader + */ + Class returnType(); + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param inputFile to read + * @param task to provide the values for metadata columns (_file_path, _spec_id, _partition) + * @param readSchema to use when reading the data file + * @param table to provide old partition specifications. Used for calculating values for + * _partition column after specification changes + * @param deleteFilter is used when the delete record filtering is pushed down to the reader + * @return {@link FileFormatReadBuilder} for building the actual reader + */ + FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter); +} diff --git a/core/src/main/java/org/apache/iceberg/DataFileReaderServiceRegistry.java b/core/src/main/java/org/apache/iceberg/DataFileReaderServiceRegistry.java new file mode 100644 index 000000000000..eddb82aa744d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/DataFileReaderServiceRegistry.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.io.FileFormatReadBuilder; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which maintains the available {@link DataFileReaderService} implementations. Based on + * the file format and the required return type the registry returns the correct {@link + * FileFormatReadBuilder} implementations which could be used to generate the readers. + */ +public class DataFileReaderServiceRegistry { + private static final Logger LOG = LoggerFactory.getLogger(DataFileReaderServiceRegistry.class); + private static final Map READ_BUILDERS = Maps.newConcurrentMap(); + + static { + ServiceLoader loader = ServiceLoader.load(DataFileReaderService.class); + for (DataFileReaderService service : loader) { + Key key = new Key(service.format(), service.returnType()); + if (READ_BUILDERS.containsKey(key)) { + throw new IllegalArgumentException( + String.format( + "Service %s clashes with %s. Both serves %s", + service.getClass(), READ_BUILDERS.get(key), key)); + } + + READ_BUILDERS.putIfAbsent(key, service); + } + + LOG.info("DataFileReaderServices found: {}", READ_BUILDERS); + } + + private DataFileReaderServiceRegistry() {} + + /** + * Provides a reader for the given {@link InputFile} which returns objects with a given + * returnType. + */ + public static FileFormatReadBuilder read( + FileFormat format, Class returnType, InputFile inputFile, Schema readSchema) { + return read(format, returnType, inputFile, null, readSchema, null, null); + } + + /** + * Provides a reader for the given {@link ContentScanTask} which returns objects with a given + * returnType. + */ + public static FileFormatReadBuilder read( + FileFormat format, + Class returnType, + InputFile inputFile, + ContentScanTask task, + Schema readSchema) { + return read(format, returnType, inputFile, task, readSchema, null, null); + } + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param task to provide the values for metadata columns (_file_path, _spec_id, _partition) + * @param readSchema to use when reading the data file + * @param table to provide old partition specifications. Used for calculating values for + * _partition column after specification changes + * @param deleteFilter is used when the delete record filtering is pushed down to the reader + * @return {@link FileFormatReadBuilder} for building the actual reader + */ + public static FileFormatReadBuilder read( + FileFormat format, + Class returnType, + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + return READ_BUILDERS + .get(new Key(format, returnType)) + .builder(inputFile, task, readSchema, table, deleteFilter); + } + + private static class Key { + private final FileFormat format; + private final Class returnType; + + private Key(FileFormat format, Class returnType) { + this.format = format; + this.returnType = returnType; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("format", format) + .add("returnType", returnType) + .toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof Key)) { + return false; + } + + Key other = (Key) o; + return Objects.equal(other.format, format) && Objects.equal(other.returnType, returnType); + } + + @Override + public int hashCode() { + return Objects.hashCode(format, returnType); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 0eaa3f2d2400..92b426a9a5bd 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -44,6 +44,8 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.avro.specific.SpecificData; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFileReaderService; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; @@ -52,6 +54,9 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; @@ -60,6 +65,8 @@ import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileFormatReadBuilder; +import org.apache.iceberg.io.FileFormatReadBuilderBase; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mapping.MappingUtil; @@ -67,8 +74,10 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.PartitionUtil; public class Avro { + private Avro() {} private enum Codec { @@ -615,13 +624,9 @@ public static ReadBuilder read(InputFile file) { return new ReadBuilder(file); } - public static class ReadBuilder { - private final InputFile file; + public static class ReadBuilder extends FileFormatReadBuilderBase { private final Map renames = Maps.newLinkedHashMap(); private ClassLoader loader = Thread.currentThread().getContextClassLoader(); - private NameMapping nameMapping; - private boolean reuseContainers = false; - private org.apache.iceberg.Schema schema = null; private Function> createReaderFunc = null; private BiFunction> createReaderBiFunc = null; private Function> createResolvingReaderFunc = null; @@ -634,12 +639,8 @@ public static class ReadBuilder { return reader; }; - private Long start = null; - private Long length = null; - private ReadBuilder(InputFile file) { - Preconditions.checkNotNull(file, "Input file cannot be null"); - this.file = file; + super(file); } public ReadBuilder createResolvingReader( @@ -668,68 +669,38 @@ public ReadBuilder createReaderFunc( return this; } - /** - * Restricts the read to the given range: [start, end = start + length). - * - * @param newStart the start position for this read - * @param newLength the length of the range this read should scan - * @return this builder for method chaining - */ - public ReadBuilder split(long newStart, long newLength) { - this.start = newStart; - this.length = newLength; - return this; - } - - public ReadBuilder project(org.apache.iceberg.Schema projectedSchema) { - this.schema = projectedSchema; - return this; - } - - public ReadBuilder reuseContainers() { - this.reuseContainers = true; - return this; - } - - public ReadBuilder reuseContainers(boolean shouldReuse) { - this.reuseContainers = shouldReuse; - return this; - } - public ReadBuilder rename(String fullName, String newName) { renames.put(fullName, newName); return this; } - public ReadBuilder withNameMapping(NameMapping newNameMapping) { - this.nameMapping = newNameMapping; - return this; - } - public ReadBuilder classLoader(ClassLoader classLoader) { this.loader = classLoader; return this; } + @Override @SuppressWarnings("unchecked") public AvroIterable build() { - Preconditions.checkNotNull(schema, "Schema is required"); + Preconditions.checkNotNull(schema(), "Schema is required"); - if (null == nameMapping) { - this.nameMapping = MappingUtil.create(schema); - } + NameMapping nameMapping = + nameMapping() != null ? nameMapping() : MappingUtil.create(schema()); DatumReader reader; if (createReaderBiFunc != null) { reader = new ProjectionDatumReader<>( - avroSchema -> createReaderBiFunc.apply(schema, avroSchema), schema, renames, null); + avroSchema -> createReaderBiFunc.apply(schema(), avroSchema), + schema(), + renames, + null); } else if (createReaderFunc != null) { - reader = new ProjectionDatumReader<>(createReaderFunc, schema, renames, null); + reader = new ProjectionDatumReader<>(createReaderFunc, schema(), renames, null); } else if (createResolvingReaderFunc != null) { - reader = (DatumReader) createResolvingReaderFunc.apply(schema); + reader = (DatumReader) createResolvingReaderFunc.apply(schema()); } else { - reader = (DatumReader) defaultCreateReaderFunc.apply(schema); + reader = (DatumReader) defaultCreateReaderFunc.apply(schema()); } if (reader instanceof SupportsCustomRecords) { @@ -738,7 +709,11 @@ public AvroIterable build() { } return new AvroIterable<>( - file, new NameMappingDatumReader<>(nameMapping, reader), start, length, reuseContainers); + file(), + new NameMappingDatumReader<>(nameMapping, reader), + start(), + length(), + isReuseContainers()); } } @@ -751,4 +726,31 @@ public AvroIterable build() { public static long rowCount(InputFile file) { return AvroIO.findStartingRowPos(file::newStream, Long.MAX_VALUE); } + + public static class ReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.AVRO; + } + + @Override + public Class returnType() { + return Record.class; + } + + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + org.apache.iceberg.Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + return Avro.read(inputFile) + .project(readSchema) + .createResolvingReader( + fileSchema -> + PlannedDataReader.create( + fileSchema, PartitionUtil.constantsMap(task, readSchema))); + } + } } diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java b/core/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java similarity index 87% rename from data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java rename to core/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java index d0c50a614620..8c65d3b60dbe 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java +++ b/core/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java @@ -25,15 +25,12 @@ import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.iceberg.DataFileReaderServiceRegistry; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.avro.PlannedDataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; -import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.deletes.PositionDeleteIndexUtil; @@ -44,10 +41,6 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.orc.OrcRowReader; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -59,8 +52,6 @@ import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; -import org.apache.orc.TypeDescription; -import org.apache.parquet.schema.MessageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -230,44 +221,10 @@ private CloseableIterable openDeletes( LOG.trace("Opening delete file {}", deleteFile.location()); InputFile inputFile = loadInputFile.apply(deleteFile); - switch (format) { - case AVRO: - return Avro.read(inputFile) - .project(projection) - .reuseContainers() - .createResolvingReader(PlannedDataReader::create) - .build(); - - case PARQUET: - return Parquet.read(inputFile) - .project(projection) - .filter(filter) - .reuseContainers() - .createReaderFunc(newParquetReaderFunc(projection)) - .build(); - - case ORC: - // reusing containers is automatic for ORC, no need to call 'reuseContainers' - return ORC.read(inputFile) - .project(projection) - .filter(filter) - .createReaderFunc(newOrcReaderFunc(projection)) - .build(); - - default: - throw new UnsupportedOperationException( - String.format( - "Cannot read deletes, %s is not a supported file format: %s", - format.name(), inputFile.location())); - } - } - - private Function> newParquetReaderFunc(Schema projection) { - return fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema); - } - - private Function> newOrcReaderFunc(Schema projection) { - return fileSchema -> GenericOrcReader.buildReader(projection, fileSchema); + return DataFileReaderServiceRegistry.read(format, Record.class, inputFile, projection) + .reuseContainers() + .filter(filter) + .build(); } private Iterable execute(Iterable objects, Function func) { diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/core/src/main/java/org/apache/iceberg/data/DeleteFilter.java similarity index 100% rename from data/src/main/java/org/apache/iceberg/data/DeleteFilter.java rename to core/src/main/java/org/apache/iceberg/data/DeleteFilter.java diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java b/core/src/main/java/org/apache/iceberg/data/DeleteLoader.java similarity index 100% rename from data/src/main/java/org/apache/iceberg/data/DeleteLoader.java rename to core/src/main/java/org/apache/iceberg/data/DeleteLoader.java diff --git a/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java b/core/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java similarity index 100% rename from data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java rename to core/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java diff --git a/core/src/main/java/org/apache/iceberg/io/FileFormatReadBuilder.java b/core/src/main/java/org/apache/iceberg/io/FileFormatReadBuilder.java new file mode 100644 index 000000000000..0dfcb82694c6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FileFormatReadBuilder.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.nio.ByteBuffer; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.mapping.NameMapping; + +/** + * Interface used for configuring and creating readers for different {@link + * org.apache.iceberg.FileFormat}s. + * + * @param the type of the builder which is needed so method chaining is available for the + * builder + */ +public interface FileFormatReadBuilder> { + /** + * Restricts the read to the given range: [start, start + length). + * + * @param newStart the start position for this read + * @param newLength the length of the range this read should scan + * @return this builder for method chaining + */ + T split(long newStart, long newLength); + + T project(Schema newSchema); + + T caseInsensitive(); + + T caseSensitive(boolean newCaseSensitive); + + T filterRecords(boolean newFilterRecords); + + T filter(Expression newFilter); + + T set(String key, String value); + + T reuseContainers(); + + T reuseContainers(boolean newReuseContainers); + + T recordsPerBatch(int numRowsPerBatch); + + T withNameMapping(NameMapping newNameMapping); + + T withFileEncryptionKey(ByteBuffer encryptionKey); + + T withAADPrefix(ByteBuffer aadPrefix); + + CloseableIterable build(); +} diff --git a/core/src/main/java/org/apache/iceberg/io/FileFormatReadBuilderBase.java b/core/src/main/java/org/apache/iceberg/io/FileFormatReadBuilderBase.java new file mode 100644 index 000000000000..d7b8325c0be6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FileFormatReadBuilderBase.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Base implementation for the {@link FileFormatReadBuilder} which handles the common attributes for + * the builders. + * + * @param the type of the builder which is needed so method chaining is available for the + * builder + */ +public abstract class FileFormatReadBuilderBase> + implements FileFormatReadBuilder { + private final InputFile file; + private final Map properties = Maps.newHashMap(); + private Long start = null; + private Long length = null; + private Schema schema = null; + private Expression filter = null; + private boolean filterRecords = true; + private boolean caseSensitive = true; + private boolean reuseContainers = false; + private int recordsPerBatch = 10000; + private NameMapping nameMapping = null; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; + + protected FileFormatReadBuilderBase(InputFile file) { + Preconditions.checkNotNull(file, "Input file cannot be null"); + this.file = file; + } + + @Override + public T split(long newStart, long newLength) { + this.start = newStart; + this.length = newLength; + return (T) this; + } + + @Override + public T project(Schema newSchema) { + this.schema = newSchema; + return (T) this; + } + + @Override + public T caseInsensitive() { + return caseSensitive(false); + } + + @Override + public T caseSensitive(boolean newCaseSensitive) { + this.caseSensitive = newCaseSensitive; + return (T) this; + } + + @Override + public T filterRecords(boolean newFilterRecords) { + this.filterRecords = newFilterRecords; + return (T) this; + } + + @Override + public T filter(Expression newFilter) { + this.filter = newFilter; + return (T) this; + } + + @Override + public T set(String key, String value) { + properties.put(key, value); + return (T) this; + } + + @Override + public T reuseContainers() { + this.reuseContainers = true; + return (T) this; + } + + @Override + public T reuseContainers(boolean newReuseContainers) { + this.reuseContainers = newReuseContainers; + return (T) this; + } + + @Override + public T recordsPerBatch(int numRowsPerBatch) { + this.recordsPerBatch = numRowsPerBatch; + return (T) this; + } + + @Override + public T withNameMapping(NameMapping newNameMapping) { + this.nameMapping = newNameMapping; + return (T) this; + } + + @Override + public T withFileEncryptionKey(ByteBuffer encryptionKey) { + this.fileEncryptionKey = encryptionKey; + return (T) this; + } + + @Override + public T withAADPrefix(ByteBuffer aadPrefix) { + this.fileAADPrefix = aadPrefix; + return (T) this; + } + + protected InputFile file() { + return file; + } + + protected Map properties() { + return properties; + } + + protected Long start() { + return start; + } + + protected Long length() { + return length; + } + + protected Schema schema() { + return schema; + } + + protected Expression filter() { + return filter; + } + + protected boolean isFilterRecords() { + return filterRecords; + } + + protected boolean isCaseSensitive() { + return caseSensitive; + } + + protected boolean isReuseContainers() { + return reuseContainers; + } + + protected int recordsPerBatch() { + return recordsPerBatch; + } + + protected NameMapping nameMapping() { + return nameMapping; + } + + protected ByteBuffer fileEncryptionKey() { + return fileEncryptionKey; + } + + protected ByteBuffer fileAADPrefix() { + return fileAADPrefix; + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java index 411d401075d6..b8c2a0bb3c85 100644 --- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.util; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.BiFunction; @@ -25,15 +26,35 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; public class PartitionUtil { private PartitionUtil() {} + public static Map constantsMap(ContentScanTask task, Schema schema) { + if (task == null) { + return Collections.emptyMap(); + } + + Schema projectedConstantFields = + TypeUtil.select( + schema, + Sets.union(task.spec().identitySourceIds(), MetadataColumns.metadataFieldIds())); + if (!projectedConstantFields.columns().isEmpty()) { + return PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant); + } else { + return Collections.emptyMap(); + } + } + public static Map constantsMap(ContentScanTask task) { return constantsMap(task, null, (type, constant) -> constant); } diff --git a/core/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/core/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..e24e9a837677 --- /dev/null +++ b/core/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.avro.Avro$ReaderService diff --git a/core/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/core/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..e24e9a837677 --- /dev/null +++ b/core/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.avro.Avro$ReaderService diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java index aaf4b76ca851..9a4576adc891 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java @@ -19,16 +19,11 @@ package org.apache.iceberg.data; import java.io.Serializable; -import java.util.Map; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFileReaderServiceRegistry; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.TableScan; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.avro.PlannedDataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; -import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -37,12 +32,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.util.PartitionUtil; class GenericReader implements Serializable { private final FileIO io; @@ -93,61 +83,14 @@ private CloseableIterable applyResidual( private CloseableIterable openFile(FileScanTask task, Schema fileProjection) { InputFile input = io.newInputFile(task.file().location()); - Map partition = - PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant); - switch (task.file().format()) { - case AVRO: - Avro.ReadBuilder avro = - Avro.read(input) - .project(fileProjection) - .createResolvingReader(schema -> PlannedDataReader.create(schema, partition)) - .split(task.start(), task.length()); - - if (reuseContainers) { - avro.reuseContainers(); - } - - return avro.build(); - - case PARQUET: - Parquet.ReadBuilder parquet = - Parquet.read(input) - .project(fileProjection) - .createReaderFunc( - fileSchema -> - GenericParquetReaders.buildReader(fileProjection, fileSchema, partition)) - .split(task.start(), task.length()) - .caseSensitive(caseSensitive) - .filter(task.residual()); - - if (reuseContainers) { - parquet.reuseContainers(); - } - - return parquet.build(); - - case ORC: - Schema projectionWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - fileProjection, Sets.union(partition.keySet(), MetadataColumns.metadataFieldIds())); - ORC.ReadBuilder orc = - ORC.read(input) - .project(projectionWithoutConstantAndMetadataFields) - .createReaderFunc( - fileSchema -> - GenericOrcReader.buildReader(fileProjection, fileSchema, partition)) - .split(task.start(), task.length()) - .caseSensitive(caseSensitive) - .filter(task.residual()); - - return orc.build(); - - default: - throw new UnsupportedOperationException( - String.format( - "Cannot read %s file: %s", task.file().format().name(), task.file().location())); - } + return DataFileReaderServiceRegistry.read( + task.file().format(), Record.class, input, task, fileProjection) + .split(task.start(), task.length()) + .caseSensitive(caseSensitive) + .reuseContainers(reuseContainers) + .filter(task.residual()) + .build(); } private class CombinedTaskIterable extends CloseableGroup implements CloseableIterable { diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index bf6f72cc287a..082846d35459 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -23,10 +23,14 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFileReaderService; +import org.apache.iceberg.DataFileReaderServiceRegistry; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.encryption.InputFilesDecryptor; @@ -42,18 +46,17 @@ import org.apache.iceberg.flink.data.RowDataUtil; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileFormatReadBuilder; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PartitionUtil; @Internal public class RowDataFileScanTaskReader implements FileScanTaskReader { - private final Schema tableSchema; private final Schema projectedSchema; private final String nameMapping; @@ -84,18 +87,10 @@ public RowDataFileScanTaskReader( @Override public CloseableIterator open( FileScanTask task, InputFilesDecryptor inputFilesDecryptor) { - Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds()); - - Map idToConstant = - partitionSchema.columns().isEmpty() - ? ImmutableMap.of() - : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); - FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor); CloseableIterable iterable = - deletes.filter( - newIterable(task, deletes.requiredSchema(), idToConstant, inputFilesDecryptor)); + deletes.filter(newIterable(task, deletes.requiredSchema(), inputFilesDecryptor)); // Project the RowData to remove the extra meta columns. if (!projectedSchema.sameSchema(deletes.requiredSchema())) { @@ -111,31 +106,28 @@ public CloseableIterator open( } private CloseableIterable newIterable( - FileScanTask task, - Schema schema, - Map idToConstant, - InputFilesDecryptor inputFilesDecryptor) { + FileScanTask task, Schema schema, InputFilesDecryptor inputFilesDecryptor) { CloseableIterable iter; if (task.isDataTask()) { throw new UnsupportedOperationException("Cannot read data task."); } else { - switch (task.file().format()) { - case PARQUET: - iter = newParquetIterable(task, schema, idToConstant, inputFilesDecryptor); - break; - - case AVRO: - iter = newAvroIterable(task, schema, idToConstant, inputFilesDecryptor); - break; - - case ORC: - iter = newOrcIterable(task, schema, idToConstant, inputFilesDecryptor); - break; - - default: - throw new UnsupportedOperationException( - "Cannot read unknown format: " + task.file().format()); + FileFormatReadBuilder builder = + DataFileReaderServiceRegistry.read( + task.file().format(), + RowData.class, + inputFilesDecryptor.getInputFile(task), + task, + schema) + .split(task.start(), task.length()) + .filter(task.residual()) + .caseSensitive(caseSensitive) + .reuseContainers(); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } + + iter = builder.build(); } if (rowFilter != null) { @@ -144,70 +136,12 @@ private CloseableIterable newIterable( return iter; } - private CloseableIterable newAvroIterable( - FileScanTask task, - Schema schema, - Map idToConstant, - InputFilesDecryptor inputFilesDecryptor) { - Avro.ReadBuilder builder = - Avro.read(inputFilesDecryptor.getInputFile(task)) - .reuseContainers() - .project(schema) - .split(task.start(), task.length()) - .createReaderFunc(readSchema -> FlinkPlannedAvroReader.create(schema, idToConstant)); - - if (nameMapping != null) { - builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - return builder.build(); - } - - private CloseableIterable newParquetIterable( - FileScanTask task, - Schema schema, - Map idToConstant, - InputFilesDecryptor inputFilesDecryptor) { - Parquet.ReadBuilder builder = - Parquet.read(inputFilesDecryptor.getInputFile(task)) - .split(task.start(), task.length()) - .project(schema) - .createReaderFunc( - fileSchema -> FlinkParquetReaders.buildReader(schema, fileSchema, idToConstant)) - .filter(task.residual()) - .caseSensitive(caseSensitive) - .reuseContainers(); - - if (nameMapping != null) { - builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - return builder.build(); - } + private static Map constantsMap(ContentScanTask task, Schema schema) { + Schema partitionSchema = TypeUtil.select(schema, task.spec().identitySourceIds()); - private CloseableIterable newOrcIterable( - FileScanTask task, - Schema schema, - Map idToConstant, - InputFilesDecryptor inputFilesDecryptor) { - Schema readSchemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - schema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - - ORC.ReadBuilder builder = - ORC.read(inputFilesDecryptor.getInputFile(task)) - .project(readSchemaWithoutConstantAndMetadataFields) - .split(task.start(), task.length()) - .createReaderFunc( - readOrcSchema -> new FlinkOrcReader(schema, readOrcSchema, idToConstant)) - .filter(task.residual()) - .caseSensitive(caseSensitive); - - if (nameMapping != null) { - builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - return builder.build(); + return partitionSchema.columns().isEmpty() + ? ImmutableMap.of() + : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); } private static class FlinkDeleteFilter extends DeleteFilter { @@ -240,4 +174,83 @@ protected InputFile getInputFile(String location) { return inputFilesDecryptor.getInputFile(location); } } + + public static class ParquetReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.PARQUET; + } + + @Override + public Class returnType() { + return RowData.class; + } + + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + return Parquet.read(inputFile) + .project(readSchema) + .createReaderFunc( + fileSchema -> + FlinkParquetReaders.buildReader( + readSchema, fileSchema, constantsMap(task, readSchema))); + } + } + + public static class ORCReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.ORC; + } + + @Override + public Class returnType() { + return RowData.class; + } + + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + Map idToConstant = constantsMap(task, readSchema); + return ORC.read(inputFile) + .project(ORC.schemaWithoutConstantAndMetadataFields(readSchema, idToConstant)) + .createReaderFunc( + readOrcSchema -> new FlinkOrcReader(readSchema, readOrcSchema, idToConstant)); + } + } + + public static class AvroReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.AVRO; + } + + @Override + public Class returnType() { + return RowData.class; + } + + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + return Avro.read(inputFile) + .project(readSchema) + .createReaderFunc( + fileSchema -> + FlinkPlannedAvroReader.create(readSchema, constantsMap(task, readSchema))); + } + } } diff --git a/flink/v1.20/flink/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/flink/v1.20/flink/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..b5327cf44b57 --- /dev/null +++ b/flink/v1.20/flink/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.flink.source.RowDataFileScanTaskReader$AvroReaderService +org.apache.iceberg.flink.source.RowDataFileScanTaskReader$ParquetReaderService +org.apache.iceberg.flink.source.RowDataFileScanTaskReader$ORCReaderService diff --git a/flink/v1.20/flink/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/flink/v1.20/flink/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..b5327cf44b57 --- /dev/null +++ b/flink/v1.20/flink/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.flink.source.RowDataFileScanTaskReader$AvroReaderService +org.apache.iceberg.flink.source.RowDataFileScanTaskReader$ParquetReaderService +org.apache.iceberg.flink.source.RowDataFileScanTaskReader$ORCReaderService diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index 58966c666d5d..1975170edfd2 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -21,13 +21,9 @@ import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; -import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.function.BiFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -37,10 +33,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFileReaderServiceRegistry; import org.apache.iceberg.DataTableScan; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MetadataColumns; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.SerializableTable; @@ -49,14 +44,10 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.data.GenericDeleteFilter; -import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.InternalRecordWrapper; -import org.apache.iceberg.data.avro.PlannedDataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; -import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.Record; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Evaluator; @@ -65,18 +56,13 @@ import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileFormatReadBuilder; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.util.PartitionUtil; import org.apache.iceberg.util.SerializationUtil; import org.apache.iceberg.util.ThreadPools; @@ -326,23 +312,22 @@ private CloseableIterable openTask(FileScanTask currentTask, Schema readSchem encryptionManager.decrypt( EncryptedFiles.encryptedInput(io.newInputFile(file.location()), file.keyMetadata())); - CloseableIterable iterable; - switch (file.format()) { - case AVRO: - iterable = newAvroIterable(inputFile, currentTask, readSchema); - break; - case ORC: - iterable = newOrcIterable(inputFile, currentTask, readSchema); - break; - case PARQUET: - iterable = newParquetIterable(inputFile, currentTask, readSchema); - break; - default: - throw new UnsupportedOperationException( - String.format("Cannot read %s file: %s", file.format().name(), file.location())); + FileFormatReadBuilder builder = + DataFileReaderServiceRegistry.read( + file.format(), Record.class, inputFile, currentTask, readSchema) + .filter(currentTask.residual()) + .caseSensitive(caseSensitive) + .split(currentTask.start(), currentTask.length()); + if (reuseContainers) { + builder.reuseContainers(); + } + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } - return iterable; + CloseableIterable parquetIterator = builder.build(); + return applyResidualFiltering(parquetIterator, currentTask.residual(), readSchema); } @SuppressWarnings("unchecked") @@ -369,86 +354,6 @@ private CloseableIterable applyResidualFiltering( } } - private CloseableIterable newAvroIterable( - InputFile inputFile, FileScanTask task, Schema readSchema) { - Avro.ReadBuilder avroReadBuilder = - Avro.read(inputFile).project(readSchema).split(task.start(), task.length()); - if (reuseContainers) { - avroReadBuilder.reuseContainers(); - } - if (nameMapping != null) { - avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - avroReadBuilder.createResolvingReader( - schema -> - PlannedDataReader.create( - schema, constantsMap(task, IdentityPartitionConverters::convertConstant))); - return applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema); - } - - private CloseableIterable newParquetIterable( - InputFile inputFile, FileScanTask task, Schema readSchema) { - Parquet.ReadBuilder parquetReadBuilder = - Parquet.read(inputFile) - .project(readSchema) - .filter(task.residual()) - .caseSensitive(caseSensitive) - .split(task.start(), task.length()); - if (reuseContainers) { - parquetReadBuilder.reuseContainers(); - } - if (nameMapping != null) { - parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - parquetReadBuilder.createReaderFunc( - fileSchema -> - GenericParquetReaders.buildReader( - readSchema, - fileSchema, - constantsMap(task, IdentityPartitionConverters::convertConstant))); - CloseableIterable parquetIterator = parquetReadBuilder.build(); - return applyResidualFiltering(parquetIterator, task.residual(), readSchema); - } - - private CloseableIterable newOrcIterable( - InputFile inputFile, FileScanTask task, Schema readSchema) { - Map idToConstant = - constantsMap(task, IdentityPartitionConverters::convertConstant); - Schema readSchemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - - // ORC does not support reuse containers yet - ORC.ReadBuilder orcReadBuilder = - ORC.read(inputFile) - .project(readSchemaWithoutConstantAndMetadataFields) - .filter(task.residual()) - .caseSensitive(caseSensitive) - .split(task.start(), task.length()); - orcReadBuilder.createReaderFunc( - fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema, idToConstant)); - - if (nameMapping != null) { - orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - CloseableIterable orcIterator = orcReadBuilder.build(); - return applyResidualFiltering(orcIterator, task.residual(), readSchema); - } - - private Map constantsMap( - FileScanTask task, BiFunction converter) { - PartitionSpec spec = task.spec(); - Set idColumns = spec.identitySourceIds(); - Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns); - boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); - if (projectsIdentityPartitionColumns) { - return PartitionUtil.constantsMap(task, converter); - } else { - return Collections.emptyMap(); - } - } - private static Schema readSchema( Configuration conf, Schema tableSchema, boolean caseSensitive) { Schema readSchema = InputFormatConfig.readSchema(conf); diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 451c670fcd54..a5d614377b8e 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -51,7 +51,10 @@ import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFileReaderService; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -59,6 +62,9 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.deletes.EqualityDeleteWriter; @@ -68,19 +74,22 @@ import org.apache.iceberg.encryption.NativeEncryptionInputFile; import org.apache.iceberg.encryption.NativeEncryptionOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; -import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileFormatReadBuilder; +import org.apache.iceberg.io.FileFormatReadBuilderBase; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.PartitionUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.orc.CompressionKind; import org.apache.orc.OrcConf; @@ -112,6 +121,12 @@ public static WriteBuilder write(EncryptedOutputFile file) { return new WriteBuilder(file.encryptingOutputFile()); } + public static Schema schemaWithoutConstantAndMetadataFields( + Schema target, Map idToConstant) { + return TypeUtil.selectNot( + target, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); + } + public static class WriteBuilder { private final OutputFile file; private final Configuration conf; @@ -686,23 +701,13 @@ public static ReadBuilder read(InputFile file) { return new ReadBuilder(file); } - public static class ReadBuilder { - private final InputFile file; + public static class ReadBuilder extends FileFormatReadBuilderBase { private final Configuration conf; - private Schema schema = null; - private Long start = null; - private Long length = null; - private Expression filter = null; - private boolean caseSensitive = true; - private NameMapping nameMapping = null; - private Function> readerFunc; private Function> batchedReaderFunc; - private int recordsPerBatch = VectorizedRowBatch.DEFAULT_SIZE; - private ReadBuilder(InputFile file) { - Preconditions.checkNotNull(file, "Input file cannot be null"); - this.file = file; + ReadBuilder(InputFile file) { + super(file); if (file instanceof HadoopInputFile) { this.conf = new Configuration(((HadoopInputFile) file).getConf()); } else { @@ -712,34 +717,18 @@ private ReadBuilder(InputFile file) { // We need to turn positional schema evolution off since we use column name based schema // evolution for projection this.conf.setBoolean(OrcConf.FORCE_POSITIONAL_EVOLUTION.getHiveConfName(), false); + this.recordsPerBatch(VectorizedRowBatch.DEFAULT_SIZE); } - /** - * Restricts the read to the given range: [start, start + length). - * - * @param newStart the start position for this read - * @param newLength the length of the range this read should scan - * @return this builder for method chaining - */ - public ReadBuilder split(long newStart, long newLength) { - this.start = newStart; - this.length = newLength; - return this; - } - - public ReadBuilder project(Schema newSchema) { - this.schema = newSchema; - return this; - } - - public ReadBuilder caseSensitive(boolean newCaseSensitive) { - OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(this.conf, newCaseSensitive); - this.caseSensitive = newCaseSensitive; - return this; + @Deprecated + public ReadBuilder config(String property, String value) { + return set(property, value); } - public ReadBuilder config(String property, String value) { - conf.set(property, value); + @Override + public ReadBuilder set(String key, String value) { + super.set(key, value); + conf.set(key, value); return this; } @@ -751,11 +740,6 @@ public ReadBuilder createReaderFunc(Function> r return this; } - public ReadBuilder filter(Expression newFilter) { - this.filter = newFilter; - return this; - } - public ReadBuilder createBatchedReaderFunc( Function> batchReaderFunction) { Preconditions.checkArgument( @@ -765,30 +749,21 @@ public ReadBuilder createBatchedReaderFunc( return this; } - public ReadBuilder recordsPerBatch(int numRecordsPerBatch) { - this.recordsPerBatch = numRecordsPerBatch; - return this; - } - - public ReadBuilder withNameMapping(NameMapping newNameMapping) { - this.nameMapping = newNameMapping; - return this; - } - + @Override public CloseableIterable build() { - Preconditions.checkNotNull(schema, "Schema is required"); + Preconditions.checkNotNull(schema(), "Schema is required"); return new OrcIterable<>( - file, + file(), conf, - schema, - nameMapping, - start, - length, + schema(), + nameMapping(), + start(), + length(), readerFunc, - caseSensitive, - filter, + isCaseSensitive(), + filter(), batchedReaderFunc, - recordsPerBatch); + recordsPerBatch()); } } @@ -829,4 +804,30 @@ static Writer newFileWriter( return writer; } + + public static class ReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.ORC; + } + + @Override + public Class returnType() { + return Record.class; + } + + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + Map idToConstant = PartitionUtil.constantsMap(task, readSchema); + return new ReadBuilder(inputFile) + .project(ORC.schemaWithoutConstantAndMetadataFields(readSchema, idToConstant)) + .createReaderFunc( + fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema, idToConstant)); + } + } } diff --git a/orc/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/orc/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..b0b602c6df17 --- /dev/null +++ b/orc/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.orc.ORC$ReaderService diff --git a/orc/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/orc/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..b0b602c6df17 --- /dev/null +++ b/orc/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.orc.ORC$ReaderService diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 310435209bac..80b18d6c9367 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -60,6 +60,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFileReaderService; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.MetricsConfig; @@ -71,6 +73,9 @@ import org.apache.iceberg.SystemConfigs; import org.apache.iceberg.Table; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; @@ -79,13 +84,14 @@ import org.apache.iceberg.encryption.NativeEncryptionInputFile; import org.apache.iceberg.encryption.NativeEncryptionOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; -import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileFormatReadBuilder; +import org.apache.iceberg.io.FileFormatReadBuilderBase; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mapping.NameMapping; @@ -99,6 +105,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.PartitionUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; @@ -1056,64 +1063,14 @@ public static ReadBuilder read(InputFile file) { } } - public static class ReadBuilder { - private final InputFile file; - private final Map properties = Maps.newHashMap(); - private Long start = null; - private Long length = null; - private Schema schema = null; - private Expression filter = null; + public static class ReadBuilder extends FileFormatReadBuilderBase { private ReadSupport readSupport = null; + private boolean callInit = false; private Function> batchedReaderFunc = null; private Function> readerFunc = null; - private boolean filterRecords = true; - private boolean caseSensitive = true; - private boolean callInit = false; - private boolean reuseContainers = false; - private int maxRecordsPerBatch = 10000; - private NameMapping nameMapping = null; - private ByteBuffer fileEncryptionKey = null; - private ByteBuffer fileAADPrefix = null; - - private ReadBuilder(InputFile file) { - this.file = file; - } - /** - * Restricts the read to the given range: [start, start + length). - * - * @param newStart the start position for this read - * @param newLength the length of the range this read should scan - * @return this builder for method chaining - */ - public ReadBuilder split(long newStart, long newLength) { - this.start = newStart; - this.length = newLength; - return this; - } - - public ReadBuilder project(Schema newSchema) { - this.schema = newSchema; - return this; - } - - public ReadBuilder caseInsensitive() { - return caseSensitive(false); - } - - public ReadBuilder caseSensitive(boolean newCaseSensitive) { - this.caseSensitive = newCaseSensitive; - return this; - } - - public ReadBuilder filterRecords(boolean newFilterRecords) { - this.filterRecords = newFilterRecords; - return this; - } - - public ReadBuilder filter(Expression newFilter) { - this.filter = newFilter; - return this; + ReadBuilder(InputFile file) { + super(file); } /** @@ -1142,11 +1099,6 @@ public ReadBuilder createBatchedReaderFunc(Function CloseableIterable build() { FileDecryptionProperties fileDecryptionProperties = null; - if (fileEncryptionKey != null) { - byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey); - byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix); + if (fileEncryptionKey() != null) { + byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey()); + byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix()); fileDecryptionProperties = FileDecryptionProperties.builder() .withFooterKey(encryptionKeyArray) .withAADPrefix(aadPrefixArray) .build(); } else { - Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); + Preconditions.checkState( + fileAADPrefix() == null, "AAD prefix set with null encryption key"); } if (readerFunc != null || batchedReaderFunc != null) { ParquetReadOptions.Builder optionsBuilder; - if (file instanceof HadoopInputFile) { + if (file() instanceof HadoopInputFile) { // remove read properties already set that may conflict with this read - Configuration conf = new Configuration(((HadoopInputFile) file).getConf()); + Configuration conf = new Configuration(((HadoopInputFile) file()).getConf()); for (String property : READ_PROPERTIES_TO_REMOVE) { conf.unset(property); } @@ -1209,12 +1138,12 @@ public CloseableIterable build() { optionsBuilder = ParquetReadOptions.builder(); } - for (Map.Entry entry : properties.entrySet()) { + for (Map.Entry entry : properties().entrySet()) { optionsBuilder.set(entry.getKey(), entry.getValue()); } - if (start != null) { - optionsBuilder.withRange(start, start + length); + if (start() != null) { + optionsBuilder.withRange(start(), start() + length()); } if (fileDecryptionProperties != null) { @@ -1224,8 +1153,8 @@ public CloseableIterable build() { ParquetReadOptions options = optionsBuilder.build(); NameMapping mapping; - if (nameMapping != null) { - mapping = nameMapping; + if (nameMapping() != null) { + mapping = nameMapping(); } else if (SystemConfigs.NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED.value()) { mapping = null; } else { @@ -1234,24 +1163,31 @@ public CloseableIterable build() { if (batchedReaderFunc != null) { return new VectorizedParquetReader<>( - file, - schema, + file(), + schema(), options, batchedReaderFunc, mapping, - filter, - reuseContainers, - caseSensitive, - maxRecordsPerBatch); + filter(), + isReuseContainers(), + isCaseSensitive(), + recordsPerBatch()); } else { return new org.apache.iceberg.parquet.ParquetReader<>( - file, schema, options, readerFunc, mapping, filter, reuseContainers, caseSensitive); + file(), + schema(), + options, + readerFunc, + mapping, + filter(), + isReuseContainers(), + isCaseSensitive()); } } - ParquetReadBuilder builder = new ParquetReadBuilder<>(ParquetIO.file(file)); + ParquetReadBuilder builder = new ParquetReadBuilder<>(ParquetIO.file(file())); - builder.project(schema); + builder.project(schema()); if (readSupport != null) { builder.readSupport((ReadSupport) readSupport); @@ -1267,18 +1203,18 @@ public CloseableIterable build() { "parquet.avro.add-list-element-records", "false"); // assume that lists use a 3-level schema - for (Map.Entry entry : properties.entrySet()) { + for (Map.Entry entry : properties().entrySet()) { builder.set(entry.getKey(), entry.getValue()); } - if (filter != null) { + if (filter() != null) { // TODO: should not need to get the schema to push down before opening the file. // Parquet should allow setting a filter inside its read support ParquetReadOptions decryptOptions = ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build(); MessageType type; try (ParquetFileReader schemaReader = - ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) { + ParquetFileReader.open(ParquetIO.file(file()), decryptOptions)) { type = schemaReader.getFileMetaData().getSchema(); } catch (IOException e) { throw new RuntimeIOException(e); @@ -1287,9 +1223,9 @@ public CloseableIterable build() { builder .useStatsFilter() .useDictionaryFilter() - .useRecordFilter(filterRecords) + .useRecordFilter(isFilterRecords()) .useBloomFilter() - .withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive)); + .withFilter(ParquetFilters.convert(fileSchema, filter(), isCaseSensitive())); } else { // turn off filtering builder @@ -1303,12 +1239,12 @@ public CloseableIterable build() { builder.callInit(); } - if (start != null) { - builder.withFileRange(start, start + length); + if (start() != null) { + builder.withFileRange(start(), start() + length()); } - if (nameMapping != null) { - builder.withNameMapping(nameMapping); + if (nameMapping() != null) { + builder.withNameMapping(nameMapping()); } if (fileDecryptionProperties != null) { @@ -1386,4 +1322,31 @@ public static void concat( } writer.end(metadata); } + + public static class ReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.PARQUET; + } + + @Override + public Class returnType() { + return Record.class; + } + + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + return new ReadBuilder(inputFile) + .project(readSchema) + .createReaderFunc( + fileSchema -> + GenericParquetReaders.buildReader( + readSchema, fileSchema, PartitionUtil.constantsMap(task, readSchema))); + } + } } diff --git a/parquet/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/parquet/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..a9e77973b3f5 --- /dev/null +++ b/parquet/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.parquet.Parquet$ReaderService diff --git a/parquet/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/parquet/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..a9e77973b3f5 --- /dev/null +++ b/parquet/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.parquet.Parquet$ReaderService diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index 55888f7f5e82..0b7a063f546f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFileReaderServiceRegistry; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; @@ -47,11 +48,8 @@ import org.apache.iceberg.actions.RewriteTablePath; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.exceptions.RuntimeIOException; @@ -608,31 +606,9 @@ private ForeachFunction rewritePositionDelete( private static CloseableIterable positionDeletesReader( InputFile inputFile, FileFormat format, PartitionSpec spec) { Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema()); - switch (format) { - case AVRO: - return Avro.read(inputFile) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc(DataReader::create) - .build(); - - case PARQUET: - return Parquet.read(inputFile) - .project(deleteSchema) - .reuseContainers() - .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema)) - .build(); - - case ORC: - return ORC.read(inputFile) - .project(deleteSchema) - .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema)) - .build(); - - default: - throw new UnsupportedOperationException("Unsupported file format: " + format); - } + return DataFileReaderServiceRegistry.read(format, Record.class, inputFile, deleteSchema) + .reuseContainers() + .build(); } private static PositionDeleteWriter positionDeletesWriter( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index c05b694a60dc..45683a56ee5f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -19,25 +19,28 @@ package org.apache.iceberg.spark.source; import java.util.Map; -import java.util.Set; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFileReaderService; +import org.apache.iceberg.DataFileReaderServiceRegistry; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileFormatReadBuilder; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; -import org.apache.iceberg.types.TypeUtil; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnarBatch; abstract class BaseBatchReader extends BaseReader { + private final int batchSize; BaseBatchReader( @@ -52,45 +55,18 @@ abstract class BaseBatchReader extends BaseReader newBatchIterable( - InputFile inputFile, - FileFormat format, - long start, - long length, - Expression residual, - Map idToConstant, - SparkDeleteFilter deleteFilter) { - switch (format) { - case PARQUET: - return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter); - - case ORC: - return newOrcIterable(inputFile, start, length, residual, idToConstant); - - default: - throw new UnsupportedOperationException( - "Format: " + format + " not supported for batched reads"); - } - } - - private CloseableIterable newParquetIterable( - InputFile inputFile, - long start, - long length, - Expression residual, - Map idToConstant, - SparkDeleteFilter deleteFilter) { - // get required schema if there are deletes - Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); - - return Parquet.read(inputFile) - .project(requiredSchema) - .split(start, length) - .createBatchedReaderFunc( - fileSchema -> - VectorizedSparkParquetReaders.buildReader( - requiredSchema, fileSchema, idToConstant, deleteFilter)) + InputFile inputFile, FileScanTask task, SparkDeleteFilter deleteFilter) { + return DataFileReaderServiceRegistry.read( + task.file().format(), + ColumnarBatch.class, + inputFile, + task, + expectedSchema(), + table(), + deleteFilter) + .split(task.start(), task.length()) .recordsPerBatch(batchSize) - .filter(residual) + .filter(task.residual()) .caseSensitive(caseSensitive()) // Spark eagerly consumes the batches. So the underlying memory allocated could be reused // without worrying about subsequent reads clobbering over each other. This improves @@ -100,29 +76,62 @@ private CloseableIterable newParquetIterable( .build(); } - private CloseableIterable newOrcIterable( - InputFile inputFile, - long start, - long length, - Expression residual, - Map idToConstant) { - Set constantFieldIds = idToConstant.keySet(); - Set metadataFieldIds = MetadataColumns.metadataFieldIds(); - Sets.SetView constantAndMetadataFieldIds = - Sets.union(constantFieldIds, metadataFieldIds); - Schema schemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds); + public static class ParquetReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.PARQUET; + } - return ORC.read(inputFile) - .project(schemaWithoutConstantAndMetadataFields) - .split(start, length) - .createBatchedReaderFunc( - fileSchema -> - VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant)) - .recordsPerBatch(batchSize) - .filter(residual) - .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .build(); + @Override + public Class returnType() { + return ColumnarBatch.class; + } + + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + // get required schema if there are deletes + Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : readSchema; + return Parquet.read(inputFile) + .project(requiredSchema) + .createBatchedReaderFunc( + fileSchema -> + VectorizedSparkParquetReaders.buildReader( + requiredSchema, + fileSchema, + constantsMap(task, readSchema, table), + (DeleteFilter) deleteFilter)); + } + } + + public static class ORCReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.ORC; + } + + @Override + public Class returnType() { + return ColumnarBatch.class; + } + + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + Map idToConstant = constantsMap(task, readSchema, table); + return ORC.read(inputFile) + .project(ORC.schemaWithoutConstantAndMetadataFields(readSchema, idToConstant)) + .createBatchedReaderFunc( + fileSchema -> + VectorizedSparkOrcReaders.buildReader(readSchema, fileSchema, idToConstant)); + } } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 6b3c3d3f2cf3..5d476e3f89f3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -190,6 +190,16 @@ private Map inputFiles() { } } + protected static Map constantsMap( + ContentScanTask task, Schema readSchema, Table tableToRead) { + if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) { + StructType partitionType = Partitioning.partitionType(tableToRead); + return PartitionUtil.constantsMap(task, partitionType, SparkUtil::internalToSpark); + } else { + return PartitionUtil.constantsMap(task, SparkUtil::internalToSpark); + } + } + protected class SparkDeleteFilter extends DeleteFilter { private final InternalRowWrapper asStructLike; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index 2d51992dd96a..809da93eee4c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -19,23 +19,25 @@ package org.apache.iceberg.spark.source; import java.util.Map; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFileReaderService; +import org.apache.iceberg.DataFileReaderServiceRegistry; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileFormatReadBuilder; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.SparkOrcReader; import org.apache.iceberg.spark.data.SparkParquetReaders; import org.apache.iceberg.spark.data.SparkPlannedAvroReader; -import org.apache.iceberg.types.TypeUtil; import org.apache.spark.sql.catalyst.InternalRow; abstract class BaseRowReader extends BaseReader { @@ -49,77 +51,93 @@ abstract class BaseRowReader extends BaseReader newIterable( - InputFile file, - FileFormat format, - long start, - long length, - Expression residual, - Schema projection, - Map idToConstant) { - switch (format) { - case PARQUET: - return newParquetIterable(file, start, length, residual, projection, idToConstant); + InputFile file, ContentScanTask task, Expression residual, Schema projection) { + return DataFileReaderServiceRegistry.read( + task.file().format(), InternalRow.class, file, task, projection, table(), null) + .reuseContainers() + .split(task.start(), task.length()) + .filter(residual) + .caseSensitive(caseSensitive()) + .withNameMapping(nameMapping()) + .build(); + } - case AVRO: - return newAvroIterable(file, start, length, projection, idToConstant); + public static class ParquetReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.PARQUET; + } - case ORC: - return newOrcIterable(file, start, length, residual, projection, idToConstant); + @Override + public Class returnType() { + return InternalRow.class; + } - default: - throw new UnsupportedOperationException("Cannot read unknown format: " + format); + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + return Parquet.read(inputFile) + .project(readSchema) + .createReaderFunc( + fileSchema -> + SparkParquetReaders.buildReader( + readSchema, fileSchema, constantsMap(task, readSchema, table))); } } - private CloseableIterable newAvroIterable( - InputFile file, long start, long length, Schema projection, Map idToConstant) { - return Avro.read(file) - .reuseContainers() - .project(projection) - .split(start, length) - .createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, idToConstant)) - .withNameMapping(nameMapping()) - .build(); - } + public static class ORCReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.ORC; + } - private CloseableIterable newParquetIterable( - InputFile file, - long start, - long length, - Expression residual, - Schema readSchema, - Map idToConstant) { - return Parquet.read(file) - .reuseContainers() - .split(start, length) - .project(readSchema) - .createReaderFunc( - fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) - .filter(residual) - .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .build(); + @Override + public Class returnType() { + return InternalRow.class; + } + + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + Map idToConstant = constantsMap(task, readSchema, table); + return ORC.read(inputFile) + .project(ORC.schemaWithoutConstantAndMetadataFields(readSchema, idToConstant)) + .createReaderFunc( + readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)); + } } - private CloseableIterable newOrcIterable( - InputFile file, - long start, - long length, - Expression residual, - Schema readSchema, - Map idToConstant) { - Schema readSchemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); + public static class AvroReaderService implements DataFileReaderService { + @Override + public FileFormat format() { + return FileFormat.AVRO; + } + + @Override + public Class returnType() { + return InternalRow.class; + } - return ORC.read(file) - .project(readSchemaWithoutConstantAndMetadataFields) - .split(start, length) - .createReaderFunc( - readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)) - .filter(residual) - .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .build(); + @Override + public FileFormatReadBuilder builder( + InputFile inputFile, + ContentScanTask task, + Schema readSchema, + Table table, + DeleteFilter deleteFilter) { + return Avro.read(inputFile) + .project(readSchema) + .createResolvingReader( + fileSchema -> + SparkPlannedAvroReader.create(fileSchema, constantsMap(task, readSchema, table))); + } } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index f45c152203ee..a95b4791f689 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.util.Map; import java.util.stream.Stream; import org.apache.iceberg.ContentFile; import org.apache.iceberg.FileScanTask; @@ -88,8 +87,6 @@ protected CloseableIterator open(FileScanTask task) { // update the current file for Spark's filename() function InputFileBlockHolder.set(filePath, task.start(), task.length()); - Map idToConstant = constantsMap(task, expectedSchema()); - InputFile inputFile = getInputFile(filePath); Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask"); @@ -98,14 +95,6 @@ protected CloseableIterator open(FileScanTask task) { ? null : new SparkDeleteFilter(filePath, task.deletes(), counter(), false); - return newBatchIterable( - inputFile, - task.file().format(), - task.start(), - task.length(), - task.residual(), - idToConstant, - deleteFilter) - .iterator(); + return newBatchIterable(inputFile, task, deleteFilter).iterator(); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index a6e2891ae696..03cb02c3edf6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -132,14 +132,7 @@ private CloseableIterable rows(ContentScanTask task, Sche InputFile location = getInputFile(filePath); Preconditions.checkNotNull(location, "Could not find InputFile"); - return newIterable( - location, - task.file().format(), - task.start(), - task.length(), - task.residual(), - readSchema, - idToConstant); + return newIterable(location, task, task.residual(), readSchema); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java index ee9449ee13c8..dd53a2db2063 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.util.Map; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; @@ -45,12 +44,11 @@ protected CloseableIterator open(FileScanTask task) { // schema or rows returned by readers Schema requiredSchema = matches.requiredSchema(); - Map idToConstant = constantsMap(task, expectedSchema()); DataFile file = task.file(); // update the current file for Spark's filename() function InputFileBlockHolder.set(file.location(), task.start(), task.length()); - return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator(); + return matches.findEqualityDeleteRows(open(task, requiredSchema)).iterator(); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java index 329bcf085569..2cd2cf852389 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java @@ -95,15 +95,7 @@ protected CloseableIterator open(PositionDeletesScanTask task) { return new DVIterator(inputFile, task.file(), expectedSchema(), idToConstant); } - return newIterable( - inputFile, - task.file().format(), - task.start(), - task.length(), - residualWithoutConstants, - expectedSchema(), - idToConstant) - .iterator(); + return newIterable(inputFile, task, residualWithoutConstants, expectedSchema()).iterator(); } private Set nonConstantFieldIds(Map idToConstant) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index f24602fd5583..6ee631672eb3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.util.Map; import java.util.stream.Stream; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataTask; @@ -88,30 +87,21 @@ protected CloseableIterator open(FileScanTask task) { // schema or rows returned by readers Schema requiredSchema = deleteFilter.requiredSchema(); - Map idToConstant = constantsMap(task, requiredSchema); // update the current file for Spark's filename() function InputFileBlockHolder.set(filePath, task.start(), task.length()); - return deleteFilter.filter(open(task, requiredSchema, idToConstant)).iterator(); + return deleteFilter.filter(open(task, requiredSchema)).iterator(); } - protected CloseableIterable open( - FileScanTask task, Schema readSchema, Map idToConstant) { + protected CloseableIterable open(FileScanTask task, Schema readSchema) { if (task.isDataTask()) { return newDataIterable(task.asDataTask(), readSchema); } else { InputFile inputFile = getInputFile(task.file().location()); Preconditions.checkNotNull( inputFile, "Could not find InputFile associated with FileScanTask"); - return newIterable( - inputFile, - task.file().format(), - task.start(), - task.length(), - task.residual(), - readSchema, - idToConstant); + return newIterable(inputFile, task, task.residual(), readSchema); } } diff --git a/spark/v3.5/spark/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/spark/v3.5/spark/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..65241304f478 --- /dev/null +++ b/spark/v3.5/spark/src/main/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.spark.source.BaseBatchReader$ParquetReaderService +org.apache.iceberg.spark.source.BaseBatchReader$ORCReaderService +org.apache.iceberg.spark.source.BaseRowReader$AvroReaderService +org.apache.iceberg.spark.source.BaseRowReader$ParquetReaderService +org.apache.iceberg.spark.source.BaseRowReader$ORCReaderService diff --git a/spark/v3.5/spark/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService b/spark/v3.5/spark/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService new file mode 100644 index 000000000000..65241304f478 --- /dev/null +++ b/spark/v3.5/spark/src/test/resources/META-INF/services/org.apache.iceberg.DataFileReaderService @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.spark.source.BaseBatchReader$ParquetReaderService +org.apache.iceberg.spark.source.BaseBatchReader$ORCReaderService +org.apache.iceberg.spark.source.BaseRowReader$AvroReaderService +org.apache.iceberg.spark.source.BaseRowReader$ParquetReaderService +org.apache.iceberg.spark.source.BaseRowReader$ORCReaderService diff --git a/spark/v3.5/spark/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark/v3.5/spark/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 000000000000..01a6c4e0670d --- /dev/null +++ b/spark/v3.5/spark/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.iceberg.spark.source.IcebergSource