From 5b65e2de6b41e3b9d43e77526553c83d5d807614 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 24 Feb 2025 17:58:06 +0800 Subject: [PATCH 1/4] fixup --- .../velox/VeloxTransformerApi.scala | 33 +++++++----- .../writer/VeloxParquetDataSource.cc | 50 ++++++++++-------- .../operators/writer/VeloxParquetDataSource.h | 7 +-- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 52 +++++++++++-------- 4 files changed, 81 insertions(+), 61 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala index d156fffa8b21..a46517bfadb8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala @@ -17,8 +17,11 @@ package org.apache.gluten.backendsapi.velox import org.apache.gluten.backendsapi.{BackendsApiManager, TransformerApi} +import org.apache.gluten.exception.GlutenException import org.apache.gluten.execution.WriteFilesExecTransformer +import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.gluten.expression.ConverterUtils +import org.apache.gluten.proto.ConfigMap import org.apache.gluten.runtime.Runtimes import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.gluten.utils.InputPartitionsUtil @@ -33,7 +36,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.task.TaskResources import org.apache.spark.util.collection.BitSet -import com.google.protobuf.{Any, Message, StringValue} +import com.google.protobuf.{Any, Message} import java.util.{Map => JMap} @@ -97,21 +100,23 @@ class VeloxTransformerApi extends TransformerApi with Logging { override def packPBMessage(message: Message): Any = Any.pack(message, "") override def genWriteParameters(write: WriteFilesExecTransformer): Any = { - val fileFormatStr = write.fileFormat match { + write.fileFormat match { case register: DataSourceRegister => register.shortName - case _ => "UnknownFileFormat" + val shortFileFormatName = register.shortName() + val nativeConf = + GlutenFormatFactory(shortFileFormatName) + .nativeConf( + write.caseInsensitiveOptions, + WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions)) + packPBMessage( + ConfigMap + .newBuilder() + .putAllConfigs(nativeConf) + .putConfigs("format", shortFileFormatName) + .build()) + case _ => + throw new GlutenException("Unsupported file write format: " + write.fileFormat) } - val compressionCodec = - WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions).capitalize - val writeParametersStr = new StringBuffer("WriteParameters:") - writeParametersStr.append("is").append(compressionCodec).append("=1") - writeParametersStr.append(";format=").append(fileFormatStr).append("\n") - - packPBMessage( - StringValue - .newBuilder() - .setValue(writeParametersStr.toString) - .build()) } } diff --git a/cpp/velox/operators/writer/VeloxParquetDataSource.cc b/cpp/velox/operators/writer/VeloxParquetDataSource.cc index 15082b8b8281..4d67569d3729 100644 --- a/cpp/velox/operators/writer/VeloxParquetDataSource.cc +++ b/cpp/velox/operators/writer/VeloxParquetDataSource.cc @@ -42,25 +42,18 @@ namespace { const int32_t kGzipWindowBits4k = 12; } -void VeloxParquetDataSource::initSink(const std::unordered_map& /* sparkConfs */) { - if (strncmp(filePath_.c_str(), "file:", 5) == 0) { - sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()}); - } else { - throw std::runtime_error("The file path is not local when writing data with parquet format in velox runtime!"); - } -} - -void VeloxParquetDataSource::init(const std::unordered_map& sparkConfs) { - initSink(sparkConfs); - +std::unique_ptr VeloxParquetDataSource::makeParquetWriteOption( + const std::unordered_map& sparkConfs) { + int64_t maxRowGroupBytes = 134217728; // 128MB + int64_t maxRowGroupRows = 100000000; // 100M if (sparkConfs.find(kParquetBlockSize) != sparkConfs.end()) { - maxRowGroupBytes_ = static_cast(stoi(sparkConfs.find(kParquetBlockSize)->second)); + maxRowGroupBytes = static_cast(stoi(sparkConfs.find(kParquetBlockSize)->second)); } if (sparkConfs.find(kParquetBlockRows) != sparkConfs.end()) { - maxRowGroupRows_ = static_cast(stoi(sparkConfs.find(kParquetBlockRows)->second)); + maxRowGroupRows = static_cast(stoi(sparkConfs.find(kParquetBlockRows)->second)); } - velox::parquet::WriterOptions writeOption; - writeOption.parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds /*micro*/; + auto writeOption = std::make_unique(); + writeOption->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds /*micro*/; auto compressionCodec = CompressionKind::CompressionKind_SNAPPY; if (sparkConfs.find(kParquetCompressionCodec) != sparkConfs.end()) { auto compressionCodecStr = sparkConfs.find(kParquetCompressionCodec)->second; @@ -74,7 +67,7 @@ void VeloxParquetDataSource::init(const std::unordered_map(); codecOptions->window_bits = kGzipWindowBits4k; - writeOption.codecOptions = std::move(codecOptions); + writeOption->codecOptions = std::move(codecOptions); } } } else if (boost::iequals(compressionCodecStr, "lzo")) { @@ -92,15 +85,28 @@ void VeloxParquetDataSource::init(const std::unordered_mapcompressionKind = compressionCodec; + writeOption->flushPolicyFactory = [maxRowGroupRows, maxRowGroupBytes]() { return std::make_unique( - maxRowGroupRows_, maxRowGroupBytes_, [&]() { return false; }); + maxRowGroupRows, maxRowGroupBytes, [&]() { return false; }); }; - writeOption.parquetWriteTimestampTimeZone = getConfigValue(sparkConfs, kSessionTimezone, std::nullopt); - auto schema = gluten::fromArrowSchema(schema_); + writeOption->parquetWriteTimestampTimeZone = getConfigValue(sparkConfs, kSessionTimezone, std::nullopt); + return writeOption; +} + +void VeloxParquetDataSource::initSink(const std::unordered_map& /* sparkConfs */) { + if (strncmp(filePath_.c_str(), "file:", 5) == 0) { + sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()}); + } else { + throw std::runtime_error("The file path is not local when writing data with parquet format in velox runtime!"); + } +} - parquetWriter_ = std::make_unique(std::move(sink_), writeOption, pool_, asRowType(schema)); +void VeloxParquetDataSource::init(const std::unordered_map& sparkConfs) { + initSink(sparkConfs); + auto schema = gluten::fromArrowSchema(schema_); + const auto writeOption = makeParquetWriteOption(sparkConfs); + parquetWriter_ = std::make_unique(std::move(sink_), *writeOption, pool_, asRowType(schema)); } void VeloxParquetDataSource::inspectSchema(struct ArrowSchema* out) { diff --git a/cpp/velox/operators/writer/VeloxParquetDataSource.h b/cpp/velox/operators/writer/VeloxParquetDataSource.h index a94ab00b9e2a..0c9c168330a6 100644 --- a/cpp/velox/operators/writer/VeloxParquetDataSource.h +++ b/cpp/velox/operators/writer/VeloxParquetDataSource.h @@ -31,6 +31,7 @@ #include "memory/VeloxColumnarBatch.h" #include "operators/writer/VeloxDataSource.h" +#include "velox/common/compression/Compression.h" #include "velox/common/file/FileSystems.h" #ifdef ENABLE_S3 #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" @@ -88,6 +89,9 @@ class VeloxParquetDataSource : public VeloxDataSource { std::shared_ptr schema) : VeloxDataSource(filePath, schema), filePath_(filePath), schema_(schema), pool_(std::move(veloxPool)) {} + static std::unique_ptr makeParquetWriteOption( + const std::unordered_map& sparkConfs); + void init(const std::unordered_map& sparkConfs) override; virtual void initSink(const std::unordered_map& sparkConfs); void inspectSchema(struct ArrowSchema* out) override; @@ -103,9 +107,6 @@ class VeloxParquetDataSource : public VeloxDataSource { std::unique_ptr sink_; private: - int64_t maxRowGroupBytes_ = 134217728; // 128MB - int64_t maxRowGroupRows_ = 100000000; // 100M - std::shared_ptr schema_; std::shared_ptr parquetWriter_; std::shared_ptr pool_; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 3661b7608487..4e0e36cefb0b 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -26,9 +26,11 @@ #include "utils/ConfigExtractor.h" +#include "config.pb.h" #include "config/GlutenConfig.h" #include "config/VeloxConfig.h" #include "operators/plannodes/RowVectorStream.h" +#include "operators/writer/VeloxParquetDataSource.h" namespace gluten { namespace { @@ -533,6 +535,7 @@ std::shared_ptr makeHiveInsertTableHandl const std::vector& partitionedBy, const std::shared_ptr& bucketProperty, const std::shared_ptr& locationHandle, + const std::shared_ptr& writerOptions, const dwio::common::FileFormat& tableStorageFormat = dwio::common::FileFormat::PARQUET, const std::optional& compressionKind = {}) { std::vector> columnHandles; @@ -578,7 +581,13 @@ std::shared_ptr makeHiveInsertTableHandl VELOX_CHECK_EQ(numBucketColumns, bucketedBy.size()); VELOX_CHECK_EQ(numSortingColumns, sortedBy.size()); return std::make_shared( - columnHandles, locationHandle, tableStorageFormat, bucketProperty, compressionKind); + columnHandles, + locationHandle, + tableStorageFormat, + bucketProperty, + compressionKind, + std::unordered_map{}, + writerOptions); } core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::WriteRel& writeRel) { @@ -646,30 +655,28 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: writePath = ""; } - // spark default compression code is snappy. - common::CompressionKind compressionCodec = common::CompressionKind::CompressionKind_SNAPPY; - if (writeRel.named_table().has_advanced_extension()) { - if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isSnappy=")) { - compressionCodec = common::CompressionKind::CompressionKind_SNAPPY; - } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isGzip=")) { - compressionCodec = common::CompressionKind::CompressionKind_GZIP; - } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isLzo=")) { - compressionCodec = common::CompressionKind::CompressionKind_LZO; - } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isLz4=")) { - compressionCodec = common::CompressionKind::CompressionKind_LZ4; - } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isZstd=")) { - compressionCodec = common::CompressionKind::CompressionKind_ZSTD; - } else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isNone=")) { - compressionCodec = common::CompressionKind::CompressionKind_NONE; - } else if (SubstraitParser::configSetInOptimization( - writeRel.named_table().advanced_extension(), "isUncompressed=")) { - compressionCodec = common::CompressionKind::CompressionKind_NONE; - } + GLUTEN_CHECK(writeRel.named_table().has_advanced_extension(), "Advanced extension not found in WriteRel"); + const auto& ext = writeRel.named_table().advanced_extension(); + GLUTEN_CHECK(ext.has_optimization(), "Extension optimization not found in WriteRel"); + const auto& opt = ext.optimization(); + gluten::ConfigMap confMap; + opt.UnpackTo(&confMap); + std::unordered_map writeConfs; + for (const auto& item : *(confMap.mutable_configs())) { + writeConfs.emplace(item.first, item.second); } // Currently only support parquet format. + const std::string& formatShortName = writeConfs["format"]; + GLUTEN_CHECK(formatShortName == "parquet", "Unsupported file write format: " + formatShortName); dwio::common::FileFormat fileFormat = dwio::common::FileFormat::PARQUET; + const std::shared_ptr writerOptions = + VeloxParquetDataSource::makeParquetWriteOption(writeConfs); + // Spark's default compression code is snappy. + const auto& compressionKind = + writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY); + return std::make_shared( nextPlanNodeId(), inputType, @@ -682,9 +689,10 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: inputType->children(), partitionedKey, bucketProperty, - makeLocationHandle(writePath, fileFormat, compressionCodec, bucketProperty != nullptr), + makeLocationHandle(writePath, fileFormat, compressionKind, bucketProperty != nullptr), + writerOptions, fileFormat, - compressionCodec)), + compressionKind)), (!partitionedKey.empty()), exec::TableWriteTraits::outputType(nullptr), connector::CommitStrategy::kNoCommit, From 5c9fc75b886eb53aad067d4fbbe923eadf90a7be Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 24 Feb 2025 18:43:39 +0800 Subject: [PATCH 2/4] fixup --- .../backendsapi/velox/VeloxTransformerApi.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala index a46517bfadb8..710e364896ce 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala @@ -26,7 +26,6 @@ import org.apache.gluten.runtime.Runtimes import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.gluten.utils.InputPartitionsUtil import org.apache.gluten.vectorized.PlanEvaluatorJniWrapper - import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.connector.read.InputPartition @@ -35,8 +34,9 @@ import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types._ import org.apache.spark.task.TaskResources import org.apache.spark.util.collection.BitSet - import com.google.protobuf.{Any, Message} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hive.execution.HiveFileFormat import java.util.{Map => JMap} @@ -101,11 +101,12 @@ class VeloxTransformerApi extends TransformerApi with Logging { override def genWriteParameters(write: WriteFilesExecTransformer): Any = { write.fileFormat match { - case register: DataSourceRegister => - register.shortName - val shortFileFormatName = register.shortName() + case _ @ (_ : ParquetFileFormat | _: HiveFileFormat) => + // Only Parquet is supported. It's safe to set a fixed "parquet" here + // because others already fell back by WriteFilesExecTransformer's validation. + val shortName = "parquet" val nativeConf = - GlutenFormatFactory(shortFileFormatName) + GlutenFormatFactory(shortName) .nativeConf( write.caseInsensitiveOptions, WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions)) @@ -113,7 +114,7 @@ class VeloxTransformerApi extends TransformerApi with Logging { ConfigMap .newBuilder() .putAllConfigs(nativeConf) - .putConfigs("format", shortFileFormatName) + .putConfigs("format", shortName) .build()) case _ => throw new GlutenException("Unsupported file write format: " + write.fileFormat) From e865464d2c8b654a68513acc94492684fddc098d Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 24 Feb 2025 18:43:55 +0800 Subject: [PATCH 3/4] fixup --- .../gluten/backendsapi/velox/VeloxTransformerApi.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala index 710e364896ce..f522411f648c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala @@ -26,17 +26,19 @@ import org.apache.gluten.runtime.Runtimes import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.gluten.utils.InputPartitionsUtil import org.apache.gluten.vectorized.PlanEvaluatorJniWrapper + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hive.execution.HiveFileFormat import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types._ import org.apache.spark.task.TaskResources import org.apache.spark.util.collection.BitSet + import com.google.protobuf.{Any, Message} -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.hive.execution.HiveFileFormat import java.util.{Map => JMap} @@ -101,7 +103,7 @@ class VeloxTransformerApi extends TransformerApi with Logging { override def genWriteParameters(write: WriteFilesExecTransformer): Any = { write.fileFormat match { - case _ @ (_ : ParquetFileFormat | _: HiveFileFormat) => + case _ @(_: ParquetFileFormat | _: HiveFileFormat) => // Only Parquet is supported. It's safe to set a fixed "parquet" here // because others already fell back by WriteFilesExecTransformer's validation. val shortName = "parquet" From 75e75b86b482008073c70446f354f6e8746fdac1 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 24 Feb 2025 18:44:22 +0800 Subject: [PATCH 4/4] fixup --- .../apache/gluten/backendsapi/velox/VeloxTransformerApi.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala index f522411f648c..9949f8822a85 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.execution.HiveFileFormat -import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types._ import org.apache.spark.task.TaskResources import org.apache.spark.util.collection.BitSet