From 5c3b7eda471cd97308e250027d2d5c6e8481f5d1 Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Sat, 24 Feb 2024 13:13:02 +0800 Subject: [PATCH 1/5] Support read iceberg mor table for Velox backend --- cpp/velox/CMakeLists.txt | 1 + cpp/velox/compute/VeloxPlanConverter.cc | 4 + cpp/velox/compute/WholeStageResultIterator.cc | 46 +++++-- cpp/velox/compute/WholeStageResultIterator.h | 2 + .../compute/iceberg/IcebergPlanConverter.cc | 81 ++++++++++++ .../compute/iceberg/IcebergPlanConverter.h | 39 ++++++ cpp/velox/substrait/SubstraitToVeloxPlan.h | 3 + .../substrait/rel/LocalFilesNode.java | 10 +- .../substrait/proto/substrait/algebra.proto | 29 ++++- .../rel/IcebergLocalFilesBuilder.java | 17 ++- .../substrait/rel/IcebergLocalFilesNode.java | 120 ++++++++++++------ .../source/GlutenIcebergSourceUtil.scala | 28 ++-- .../execution/VeloxIcebergSuite.scala | 55 ++++++-- 13 files changed, 356 insertions(+), 79 deletions(-) create mode 100644 cpp/velox/compute/iceberg/IcebergPlanConverter.cc create mode 100644 cpp/velox/compute/iceberg/IcebergPlanConverter.h diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index b15fd395ff23..35d05d4426dc 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -296,6 +296,7 @@ set(VELOX_SRCS compute/VeloxRuntime.cc compute/WholeStageResultIterator.cc compute/VeloxPlanConverter.cc + compute/iceberg/IcebergPlanConverter.cc jni/VeloxJniWrapper.cc jni/JniFileSystem.cc jni/JniUdf.cc diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 8ca9f85cd870..370655c3b857 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -20,6 +20,7 @@ #include "compute/ResultIterator.h" #include "config/GlutenConfig.h" +#include "iceberg/IcebergPlanConverter.h" #include "operators/plannodes/RowVectorStream.h" #include "velox/common/file/FileSystems.h" @@ -93,6 +94,9 @@ std::shared_ptr parseScanSplitInfo( case SubstraitFileFormatCase::kText: splitInfo->format = dwio::common::FileFormat::TEXT; break; + case SubstraitFileFormatCase::kIceberg: + splitInfo = IcebergPlanConverter::parseIcebergSplitInfo(file, std::move(splitInfo)); + break; default: splitInfo->format = dwio::common::FileFormat::UNKNOWN; break; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 86431819ba94..1c3ad0d8eca7 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -154,19 +154,39 @@ WholeStageResultIterator::WholeStageResultIterator( auto metadataColumn = metadataColumns[idx]; std::unordered_map> partitionKeys; constructPartitionColumns(partitionKeys, partitionColumn); - auto split = std::make_shared( - kHiveConnectorId, - paths[idx], - format, - starts[idx], - lengths[idx], - partitionKeys, - std::nullopt, - std::unordered_map(), - nullptr, - std::unordered_map(), - 0, - metadataColumn); + std::shared_ptr split; + if (auto icebergSplitInfo = std::dynamic_pointer_cast(scanInfo)) { + // Set Iceberg split. + std::unordered_map customSplitInfo{{"table_format", "hive-iceberg"}}; + auto deleteFilesFind = icebergSplitInfo->deleteFilesMap.find(paths[idx]); + auto deleteFiles = deleteFilesFind != icebergSplitInfo->deleteFilesMap.end() ? deleteFilesFind->second + : std::vector{}; + split = std::make_shared( + kHiveConnectorId, + paths[idx], + format, + starts[idx], + lengths[idx], + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + deleteFiles); + } else { + split = std::make_shared( + kHiveConnectorId, + paths[idx], + format, + starts[idx], + lengths[idx], + partitionKeys, + std::nullopt, + std::unordered_map(), + nullptr, + std::unordered_map(), + 0, + metadataColumn); + } connectorSplits.emplace_back(split); } diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 082cc6397d73..10c1937b78ef 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -17,11 +17,13 @@ #pragma once #include "compute/Runtime.h" +#include "iceberg/IcebergPlanConverter.h" #include "memory/ColumnarBatchIterator.h" #include "memory/VeloxColumnarBatch.h" #include "substrait/SubstraitToVeloxPlan.h" #include "substrait/plan.pb.h" #include "utils/metrics.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" #include "velox/core/Config.h" #include "velox/core/PlanNode.h" #include "velox/exec/Task.h" diff --git a/cpp/velox/compute/iceberg/IcebergPlanConverter.cc b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc new file mode 100644 index 000000000000..f4215c10c189 --- /dev/null +++ b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "IcebergPlanConverter.h" + +namespace gluten { + +std::shared_ptr IcebergPlanConverter::parseIcebergSplitInfo( + substrait::ReadRel_LocalFiles_FileOrFiles file, + std::shared_ptr splitInfo) { + using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::IcebergReadOptions::FileFormatCase; + using SubstraitDeleteFileFormatCase = + ::substrait::ReadRel_LocalFiles_FileOrFiles::IcebergReadOptions::DeleteFile::FileFormatCase; + auto icebergSplitInfo = std::dynamic_pointer_cast(splitInfo) + ? std::dynamic_pointer_cast(splitInfo) + : std::make_shared(*splitInfo); + auto icebergReadOption = file.iceberg(); + switch (icebergReadOption.file_format_case()) { + case SubstraitFileFormatCase::kParquet: + icebergSplitInfo->format = dwio::common::FileFormat::PARQUET; + break; + case SubstraitFileFormatCase::kOrc: + icebergSplitInfo->format = dwio::common::FileFormat::ORC; + break; + default: + icebergSplitInfo->format = dwio::common::FileFormat::UNKNOWN; + break; + } + if (icebergReadOption.delete_files_size() > 0) { + auto deleteFiles = icebergReadOption.delete_files(); + std::vector deletes; + deletes.reserve(icebergReadOption.delete_files_size()); + for (auto i = 0; i < icebergReadOption.delete_files_size(); i++) { + auto deleteFile = icebergReadOption.delete_files().Get(i); + dwio::common::FileFormat format; + FileContent fileContent; + switch (deleteFile.file_format_case()) { + case SubstraitDeleteFileFormatCase::kParquet: + format = dwio::common::FileFormat::PARQUET; + break; + case SubstraitDeleteFileFormatCase::kOrc: + format = dwio::common::FileFormat::ORC; + break; + default: + format = dwio::common::FileFormat::UNKNOWN; + } + switch (deleteFile.filecontent()) { + case ::substrait::ReadRel_LocalFiles_FileOrFiles_IcebergReadOptions_FileContent_POSITION_DELETES: + fileContent = FileContent::kPositionalDeletes; + break; + case ::substrait::ReadRel_LocalFiles_FileOrFiles_IcebergReadOptions_FileContent_EQUALITY_DELETES: + fileContent = FileContent::kEqualityDeletes; + break; + default: + fileContent = FileContent::kData; + break; + } + deletes.emplace_back(IcebergDeleteFile( + fileContent, deleteFile.filepath(), format, deleteFile.recordcount(), deleteFile.filesize())); + } + icebergSplitInfo->deleteFilesMap[file.uri_file()] = std::move(deletes); + } + + return icebergSplitInfo; +} + +} // namespace gluten diff --git a/cpp/velox/compute/iceberg/IcebergPlanConverter.h b/cpp/velox/compute/iceberg/IcebergPlanConverter.h new file mode 100644 index 000000000000..6ad97396ed09 --- /dev/null +++ b/cpp/velox/compute/iceberg/IcebergPlanConverter.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "substrait/SubstraitToVeloxPlan.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +using namespace facebook::velox::connector::hive::iceberg; + +namespace gluten { +struct IcebergSplitInfo : SplitInfo { + std::unordered_map> deleteFilesMap; + + IcebergSplitInfo(const SplitInfo& splitInfo) : SplitInfo(splitInfo) {} +}; + +class IcebergPlanConverter { + public: + static std::shared_ptr parseIcebergSplitInfo( + substrait::ReadRel_LocalFiles_FileOrFiles file, + std::shared_ptr splitInfo); +}; + +} // namespace gluten diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 895c1d24e768..59d3312cbb61 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -50,6 +50,9 @@ struct SplitInfo { /// The file format of the files to be scanned. dwio::common::FileFormat format; + + /// Make SplitInfo polymorphic + virtual ~SplitInfo() = default; }; /// This class is used to convert the Substrait plan into Velox plan. diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java index e0700ded29c7..852d7558eb78 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java @@ -50,7 +50,7 @@ public enum ReadFileFormat { UnknownFormat() } - private ReadFileFormat fileFormat = ReadFileFormat.UnknownFormat; + protected ReadFileFormat fileFormat = ReadFileFormat.UnknownFormat; private Boolean iterAsInput = false; private StructType fileSchema; private Map fileReadProperties; @@ -112,6 +112,13 @@ public List preferredLocations() { return this.preferredLocations; } + /** + * Data Lake formats require some additional processing to be done on the FileBuilder, such as + * inserting delete files information. Different lake formats should override this method to + * implement their corresponding logic. + */ + protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder) {} + public ReadRel.LocalFiles toProtobuf() { ReadRel.LocalFiles.Builder localFilesBuilder = ReadRel.LocalFiles.newBuilder(); // The input is iterator, and the path is in the format of: Iterator:index. @@ -210,6 +217,7 @@ public ReadRel.LocalFiles toProtobuf() { default: break; } + processFileBuilder(fileBuilder); localFilesBuilder.addItems(fileBuilder.build()); } return localFilesBuilder.build(); diff --git a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto index 63a0f36ea66b..ac4f78c6ec9b 100644 --- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto @@ -149,6 +149,28 @@ message ReadRel { uint64 max_block_size = 1; NamedStruct schema = 2 [deprecated=true]; } + message IcebergReadOptions { + enum FileContent { + DATA = 0; + POSITION_DELETES = 1; + EQUALITY_DELETES = 2; + } + message DeleteFile { + FileContent fileContent = 1; + string filePath = 2; + uint64 fileSize = 5; + uint64 recordCount = 6; + oneof file_format { + ParquetReadOptions parquet = 7; + OrcReadOptions orc = 8; + } + } + oneof file_format { + ParquetReadOptions parquet = 1; + OrcReadOptions orc = 2; + } + repeated DeleteFile delete_files = 3; + } // File reading options oneof file_format { @@ -159,22 +181,23 @@ message ReadRel { DwrfReadOptions dwrf = 13; TextReadOptions text = 14; JsonReadOptions json = 15; + IcebergReadOptions iceberg = 16; } message partitionColumn { string key = 1; string value = 2; } - repeated partitionColumn partition_columns = 16; + repeated partitionColumn partition_columns = 17; /// File schema - NamedStruct schema = 17; + NamedStruct schema = 18; message metadataColumn { string key = 1; string value = 2; } - repeated metadataColumn metadata_columns = 18; + repeated metadataColumn metadata_columns = 19; } } } diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java index 3452836cfd83..773f3073bbc4 100644 --- a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java +++ b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java @@ -16,13 +16,12 @@ */ package io.glutenproject.substrait.rel; +import org.apache.iceberg.DeleteFile; + import java.util.List; import java.util.Map; public class IcebergLocalFilesBuilder { - - // TODO: Add makeIcebergLocalFiles for MOR iceberg table - public static IcebergLocalFilesNode makeIcebergLocalFiles( Integer index, List paths, @@ -30,8 +29,16 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles( List lengths, List> partitionColumns, LocalFilesNode.ReadFileFormat fileFormat, - List preferredLocations) { + List preferredLocations, + Map> deleteFilesMap) { return new IcebergLocalFilesNode( - index, paths, starts, lengths, partitionColumns, fileFormat, preferredLocations); + index, + paths, + starts, + lengths, + partitionColumns, + fileFormat, + preferredLocations, + deleteFilesMap); } } diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java index 98cc0d90e8fe..8e68dec4f72d 100644 --- a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java +++ b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java @@ -16,40 +16,18 @@ */ package io.glutenproject.substrait.rel; +import io.glutenproject.GlutenConfig; + +import io.substrait.proto.ReadRel; +import org.apache.iceberg.DeleteFile; + import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; public class IcebergLocalFilesNode extends LocalFilesNode { - - class DeleteFile { - private final String path; - private final Integer fileContent; - private final ReadFileFormat fileFormat; - private final Long fileSize; - private final Long recordCount; - private final Map lowerBounds; - private final Map upperBounds; - - DeleteFile( - String path, - Integer fileContent, - ReadFileFormat fileFormat, - Long fileSize, - Long recordCount, - Map lowerBounds, - Map upperBounds) { - this.path = path; - this.fileContent = fileContent; - this.fileFormat = fileFormat; - this.fileSize = fileSize; - this.recordCount = recordCount; - this.lowerBounds = lowerBounds; - this.upperBounds = upperBounds; - } - } - - // TODO: Add delete file support for MOR iceberg table + private final Map> deleteFilesMap; IcebergLocalFilesNode( Integer index, @@ -58,15 +36,79 @@ class DeleteFile { List lengths, List> partitionColumns, ReadFileFormat fileFormat, - List preferredLocations) { - super( - index, - paths, - starts, - lengths, - partitionColumns, - new ArrayList<>(), - fileFormat, - preferredLocations); + List preferredLocations, + Map> deleteFilesMap) { + super(index, paths, starts, lengths, partitionColumns, new ArrayList<>(), fileFormat, preferredLocations); + this.deleteFilesMap = deleteFilesMap; + } + + @Override + protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder) { + List deleteFiles = + deleteFilesMap.getOrDefault(fileBuilder.getUriFile(), Collections.emptyList()); + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.Builder icebergBuilder = + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.newBuilder(); + + switch (fileFormat) { + case ParquetReadFormat: + ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions = + ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder() + .setEnableRowGroupMaxminIndex( + GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex()) + .build(); + icebergBuilder.setParquet(parquetReadOptions); + break; + case OrcReadFormat: + ReadRel.LocalFiles.FileOrFiles.OrcReadOptions orcReadOptions = + ReadRel.LocalFiles.FileOrFiles.OrcReadOptions.newBuilder().build(); + icebergBuilder.setOrc(orcReadOptions); + break; + default: + throw new UnsupportedOperationException( + "Unsupported file format " + fileFormat.name() + " for iceberg data file."); + } + + for (DeleteFile delete : deleteFiles) { + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.DeleteFile.Builder deleteFileBuilder = + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.DeleteFile.newBuilder(); + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent fileContent; + switch (delete.content()) { + case EQUALITY_DELETES: + fileContent = + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent.EQUALITY_DELETES; + break; + case POSITION_DELETES: + fileContent = + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent.POSITION_DELETES; + break; + default: + throw new UnsupportedOperationException( + "Unsupported FileCount " + delete.content().name() + " for delete file."); + } + deleteFileBuilder.setFileContent(fileContent); + deleteFileBuilder.setFilePath(delete.path().toString()); + deleteFileBuilder.setFileSize(delete.fileSizeInBytes()); + deleteFileBuilder.setRecordCount(delete.recordCount()); + switch (delete.format()) { + case PARQUET: + ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions = + ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder() + .setEnableRowGroupMaxminIndex( + GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex()) + .build(); + deleteFileBuilder.setParquet(parquetReadOptions); + break; + case ORC: + ReadRel.LocalFiles.FileOrFiles.OrcReadOptions orcReadOptions = + ReadRel.LocalFiles.FileOrFiles.OrcReadOptions.newBuilder().build(); + deleteFileBuilder.setOrc(orcReadOptions); + break; + default: + throw new UnsupportedOperationException( + "Unsupported format " + delete.format().name() + " for delete file."); + } + icebergBuilder.addDeleteFiles(deleteFileBuilder); + } + fileBuilder.setIceberg(icebergBuilder); } } diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala index 6607ff3b9539..74add7a9a5b4 100644 --- a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala @@ -24,10 +24,10 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.types.StructType -import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask, ScanTask} +import org.apache.iceberg.{CombinedScanTask, DeleteFile, FileFormat, FileScanTask, ScanTask} import java.lang.{Long => JLong} -import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} import scala.collection.JavaConverters._ @@ -39,22 +39,21 @@ object GlutenIcebergSourceUtil { val starts = new JArrayList[JLong]() val lengths = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]]() + val deleteFilesMap = new JHashMap[String, JList[DeleteFile]]() var fileFormat = ReadFileFormat.UnknownFormat val tasks = partition.taskGroup[ScanTask]().tasks().asScala asFileScanTask(tasks.toList).foreach { task => - paths.add(task.file().path().toString) + val filePath = task.file().path().toString + paths.add(filePath) starts.add(task.start()) lengths.add(task.length()) partitionColumns.add(getPartitionColumns(task)) - val currentFileFormat = task.file().format() match { - case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat - case FileFormat.ORC => ReadFileFormat.OrcReadFormat - case _ => - throw new UnsupportedOperationException( - "Iceberg Only support parquet and orc file format.") + if (!task.deletes().isEmpty) { + deleteFilesMap.put(filePath, task.deletes()) } + val currentFileFormat = convertFileFormat(task.file().format()) if (fileFormat == ReadFileFormat.UnknownFormat) { fileFormat = currentFileFormat } else if (fileFormat != currentFileFormat) { @@ -73,7 +72,8 @@ object GlutenIcebergSourceUtil { lengths, partitionColumns, fileFormat, - preferredLoc.toList.asJava + preferredLoc.toList.asJava, + deleteFilesMap ) case _ => throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.") @@ -152,4 +152,12 @@ object GlutenIcebergSourceUtil { } partitionColumns } + + def convertFileFormat(icebergFileFormat: FileFormat): ReadFileFormat = + icebergFileFormat match { + case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat + case FileFormat.ORC => ReadFileFormat.OrcReadFormat + case _ => + throw new UnsupportedOperationException("Iceberg Only support parquet and orc file format.") + } } diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala index 019c602957fe..90c024fb4f34 100644 --- a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala @@ -59,15 +59,54 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { } test("iceberg transformer exists") { - spark.sql(""" - |create table iceberg_tb using iceberg as - |(select 1 as col1, 2 as col2, 3 as col3) - |""".stripMargin) + withTable("iceberg_tb") { + spark.sql(""" + |create table iceberg_tb using iceberg as + |(select 1 as col1, 2 as col2, 3 as col3) + |""".stripMargin) - runQueryAndCompare(""" - |select * from iceberg_tb; - |""".stripMargin) { - checkOperatorMatch[IcebergScanTransformer] + runQueryAndCompare(""" + |select * from iceberg_tb; + |""".stripMargin) { + checkOperatorMatch[IcebergScanTransformer] + } + } + } + + test("iceberg read mor table") { + withTable("iceberg_mor_tb") { + withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") { + spark.sql(""" + |create table iceberg_mor_tb ( + | id int, + | name string, + | p string + |) using iceberg + |tblproperties ( + | 'format-version' = '2', + | 'write.delete.mode' = 'merge-on-read', + | 'write.update.mode' = 'merge-on-read', + | 'write.merge.mode' = 'merge-on-read' + |) + |partitioned by (p); + |""".stripMargin) + // Insert some test rows. + spark.sql(""" + |insert into table iceberg_mor_tb + |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'); + |""".stripMargin) + // Delete row. + spark.sql( + """ + |delete from iceberg_mor_tb where name = 'a1'; + |""".stripMargin + ) + } + runQueryAndCompare(""" + |select * from iceberg_mor_tb; + |""".stripMargin) { + checkOperatorMatch[IcebergScanTransformer] + } } } From ca8274145fc1c9f6afb748d0aaf5340da6ebfff6 Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Mon, 11 Mar 2024 17:50:58 +0800 Subject: [PATCH 2/5] Add more test case --- .../substrait/proto/substrait/algebra.proto | 8 +- .../execution/VeloxIcebergSuite.scala | 122 ++++++++++++++++++ 2 files changed, 126 insertions(+), 4 deletions(-) diff --git a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto index ac4f78c6ec9b..e9ed0f5ef4e2 100644 --- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto @@ -158,11 +158,11 @@ message ReadRel { message DeleteFile { FileContent fileContent = 1; string filePath = 2; - uint64 fileSize = 5; - uint64 recordCount = 6; + uint64 fileSize = 3; + uint64 recordCount = 4; oneof file_format { - ParquetReadOptions parquet = 7; - OrcReadOptions orc = 8; + ParquetReadOptions parquet = 5; + OrcReadOptions orc = 6; } } oneof file_format { diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala index 90c024fb4f34..211caa1720b6 100644 --- a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala @@ -353,4 +353,126 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { } } } + + test("iceberg read mor table - delete and update") { + withTable("iceberg_mor_tb") { + withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") { + spark.sql(""" + |create table iceberg_mor_tb ( + | id int, + | name string, + | p string + |) using iceberg + |tblproperties ( + | 'format-version' = '2', + | 'write.delete.mode' = 'merge-on-read', + | 'write.update.mode' = 'merge-on-read', + | 'write.merge.mode' = 'merge-on-read' + |) + |partitioned by (p); + |""".stripMargin) + + // Insert some test rows. + spark.sql(""" + |insert into table iceberg_mor_tb + |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'), + | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1'); + |""".stripMargin) + + // Delete row. + spark.sql( + """ + |delete from iceberg_mor_tb where name = 'a1'; + |""".stripMargin + ) + // Update row. + spark.sql( + """ + |update iceberg_mor_tb set name = 'new_a2' where id = 'a2'; + |""".stripMargin + ) + // Delete row again. + spark.sql( + """ + |delete from iceberg_mor_tb where id = 6; + |""".stripMargin + ) + } + runQueryAndCompare(""" + |select * from iceberg_mor_tb; + |""".stripMargin) { + checkOperatorMatch[IcebergScanTransformer] + } + } + } + + test("iceberg read mor table - merge into") { + withTable("iceberg_mor_tb", "merge_into_source_tb") { + withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") { + spark.sql(""" + |create table iceberg_mor_tb ( + | id int, + | name string, + | p string + |) using iceberg + |tblproperties ( + | 'format-version' = '2', + | 'write.delete.mode' = 'merge-on-read', + | 'write.update.mode' = 'merge-on-read', + | 'write.merge.mode' = 'merge-on-read' + |) + |partitioned by (p); + |""".stripMargin) + spark.sql(""" + |create table merge_into_source_tb ( + | id int, + | name string, + | p string + |) using iceberg; + |""".stripMargin) + + // Insert some test rows. + spark.sql(""" + |insert into table iceberg_mor_tb + |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'); + |""".stripMargin) + spark.sql(""" + |insert into table merge_into_source_tb + |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'), + | (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2'); + |""".stripMargin) + + // Delete row. + spark.sql( + """ + |delete from iceberg_mor_tb where name = 'a1'; + |""".stripMargin + ) + // Update row. + spark.sql( + """ + |update iceberg_mor_tb set name = 'new_a2' where id = 'a2'; + |""".stripMargin + ) + + // Merge into. + spark.sql( + """ + |merge into iceberg_mor_tb t + |using (select * from merge_into_source_tb) s + |on t.id = s.id + |when matched then + | update set t.name = s.name, t.p = s.p + |when not matched then + | insert (id, name, p) values (s.id, s.name, s.p); + |""".stripMargin + ) + } + runQueryAndCompare(""" + |select * from iceberg_mor_tb; + |""".stripMargin) { + checkOperatorMatch[IcebergScanTransformer] + } + } + } } From 86d17693d8c8e9835f94e60782b3e6652afbc885 Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Wed, 13 Mar 2024 09:51:56 +0800 Subject: [PATCH 3/5] Use two-dimensional vector to store delete files --- cpp/velox/compute/WholeStageResultIterator.cc | 4 +--- cpp/velox/compute/iceberg/IcebergPlanConverter.cc | 5 ++++- cpp/velox/compute/iceberg/IcebergPlanConverter.h | 7 +++++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 1c3ad0d8eca7..f645661b7fbb 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -158,9 +158,7 @@ WholeStageResultIterator::WholeStageResultIterator( if (auto icebergSplitInfo = std::dynamic_pointer_cast(scanInfo)) { // Set Iceberg split. std::unordered_map customSplitInfo{{"table_format", "hive-iceberg"}}; - auto deleteFilesFind = icebergSplitInfo->deleteFilesMap.find(paths[idx]); - auto deleteFiles = deleteFilesFind != icebergSplitInfo->deleteFilesMap.end() ? deleteFilesFind->second - : std::vector{}; + auto deleteFiles = icebergSplitInfo->deleteFilesVec[idx]; split = std::make_shared( kHiveConnectorId, paths[idx], diff --git a/cpp/velox/compute/iceberg/IcebergPlanConverter.cc b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc index f4215c10c189..07c40e6e1c7b 100644 --- a/cpp/velox/compute/iceberg/IcebergPlanConverter.cc +++ b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc @@ -72,7 +72,10 @@ std::shared_ptr IcebergPlanConverter::parseIcebergSplitInfo( deletes.emplace_back(IcebergDeleteFile( fileContent, deleteFile.filepath(), format, deleteFile.recordcount(), deleteFile.filesize())); } - icebergSplitInfo->deleteFilesMap[file.uri_file()] = std::move(deletes); + icebergSplitInfo->deleteFilesVec.emplace_back(deletes); + } else { + // Add an empty delete files vector to indicate that this data file has no delete file. + icebergSplitInfo->deleteFilesVec.emplace_back(std::vector{}); } return icebergSplitInfo; diff --git a/cpp/velox/compute/iceberg/IcebergPlanConverter.h b/cpp/velox/compute/iceberg/IcebergPlanConverter.h index 6ad97396ed09..d634a861fe3a 100644 --- a/cpp/velox/compute/iceberg/IcebergPlanConverter.h +++ b/cpp/velox/compute/iceberg/IcebergPlanConverter.h @@ -24,9 +24,12 @@ using namespace facebook::velox::connector::hive::iceberg; namespace gluten { struct IcebergSplitInfo : SplitInfo { - std::unordered_map> deleteFilesMap; + std::vector> deleteFilesVec; - IcebergSplitInfo(const SplitInfo& splitInfo) : SplitInfo(splitInfo) {} + IcebergSplitInfo(const SplitInfo& splitInfo) : SplitInfo(splitInfo) { + // Reserve the actual size of the deleteFilesVec. + deleteFilesVec.reserve(splitInfo.paths.capacity()); + } }; class IcebergPlanConverter { From 73c8eb5b7eea2f94fecd1875e998ce0f1eef07ab Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Thu, 14 Mar 2024 16:48:28 +0800 Subject: [PATCH 4/5] Rebase --- .../substrait/rel/IcebergLocalFilesNode.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java index 8e68dec4f72d..903bd198a5b5 100644 --- a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java +++ b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java @@ -38,7 +38,15 @@ public class IcebergLocalFilesNode extends LocalFilesNode { ReadFileFormat fileFormat, List preferredLocations, Map> deleteFilesMap) { - super(index, paths, starts, lengths, partitionColumns, new ArrayList<>(), fileFormat, preferredLocations); + super( + index, + paths, + starts, + lengths, + partitionColumns, + new ArrayList<>(), + fileFormat, + preferredLocations); this.deleteFilesMap = deleteFilesMap; } From 1d95748523580d7c3859c274e82aa5a038bd69ef Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Thu, 14 Mar 2024 16:51:23 +0800 Subject: [PATCH 5/5] Remove useless ut case --- .../execution/VeloxIcebergSuite.scala | 37 ------------------- 1 file changed, 37 deletions(-) diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala index 211caa1720b6..6b332641eb8b 100644 --- a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala @@ -73,43 +73,6 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { } } - test("iceberg read mor table") { - withTable("iceberg_mor_tb") { - withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") { - spark.sql(""" - |create table iceberg_mor_tb ( - | id int, - | name string, - | p string - |) using iceberg - |tblproperties ( - | 'format-version' = '2', - | 'write.delete.mode' = 'merge-on-read', - | 'write.update.mode' = 'merge-on-read', - | 'write.merge.mode' = 'merge-on-read' - |) - |partitioned by (p); - |""".stripMargin) - // Insert some test rows. - spark.sql(""" - |insert into table iceberg_mor_tb - |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'); - |""".stripMargin) - // Delete row. - spark.sql( - """ - |delete from iceberg_mor_tb where name = 'a1'; - |""".stripMargin - ) - } - runQueryAndCompare(""" - |select * from iceberg_mor_tb; - |""".stripMargin) { - checkOperatorMatch[IcebergScanTransformer] - } - } - } - test("iceberg bucketed join") { assume(isSparkVersionAtleast("3.4")) val leftTable = "p_str_tb"