Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Add support for some Parquet write options to 3.4 / 3.5 to align with 3.2 / 3.3 #8816

Merged
merged 4 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package org.apache.gluten.backendsapi.velox

import org.apache.gluten.backendsapi.{BackendsApiManager, TransformerApi}
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.proto.ConfigMap
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode}
import org.apache.gluten.utils.InputPartitionsUtil
Expand All @@ -28,12 +31,13 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.types._
import org.apache.spark.task.TaskResources
import org.apache.spark.util.collection.BitSet

import com.google.protobuf.{Any, Message, StringValue}
import com.google.protobuf.{Any, Message}

import java.util.{Map => JMap}

Expand Down Expand Up @@ -97,21 +101,24 @@ class VeloxTransformerApi extends TransformerApi with Logging {
override def packPBMessage(message: Message): Any = Any.pack(message, "")

override def genWriteParameters(write: WriteFilesExecTransformer): Any = {
val fileFormatStr = write.fileFormat match {
case register: DataSourceRegister =>
register.shortName
case _ => "UnknownFileFormat"
write.fileFormat match {
case _ @(_: ParquetFileFormat | _: HiveFileFormat) =>
// Only Parquet is supported. It's safe to set a fixed "parquet" here
// because others already fell back by WriteFilesExecTransformer's validation.
val shortName = "parquet"
val nativeConf =
GlutenFormatFactory(shortName)
.nativeConf(
write.caseInsensitiveOptions,
WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions))
packPBMessage(
ConfigMap
.newBuilder()
.putAllConfigs(nativeConf)
.putConfigs("format", shortName)
.build())
case _ =>
throw new GlutenException("Unsupported file write format: " + write.fileFormat)
}
val compressionCodec =
WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions).capitalize
val writeParametersStr = new StringBuffer("WriteParameters:")
writeParametersStr.append("is").append(compressionCodec).append("=1")
writeParametersStr.append(";format=").append(fileFormatStr).append("\n")

packPBMessage(
StringValue
.newBuilder()
.setValue(writeParametersStr.toString)
.build())
}
}
50 changes: 28 additions & 22 deletions cpp/velox/operators/writer/VeloxParquetDataSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,18 @@ namespace {
const int32_t kGzipWindowBits4k = 12;
}

void VeloxParquetDataSource::initSink(const std::unordered_map<std::string, std::string>& /* sparkConfs */) {
if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
} else {
throw std::runtime_error("The file path is not local when writing data with parquet format in velox runtime!");
}
}

void VeloxParquetDataSource::init(const std::unordered_map<std::string, std::string>& sparkConfs) {
initSink(sparkConfs);

std::unique_ptr<facebook::velox::parquet::WriterOptions> VeloxParquetDataSource::makeParquetWriteOption(
const std::unordered_map<std::string, std::string>& sparkConfs) {
int64_t maxRowGroupBytes = 134217728; // 128MB
int64_t maxRowGroupRows = 100000000; // 100M
if (sparkConfs.find(kParquetBlockSize) != sparkConfs.end()) {
maxRowGroupBytes_ = static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockSize)->second));
maxRowGroupBytes = static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockSize)->second));
}
if (sparkConfs.find(kParquetBlockRows) != sparkConfs.end()) {
maxRowGroupRows_ = static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockRows)->second));
maxRowGroupRows = static_cast<int64_t>(stoi(sparkConfs.find(kParquetBlockRows)->second));
}
velox::parquet::WriterOptions writeOption;
writeOption.parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds /*micro*/;
auto writeOption = std::make_unique<facebook::velox::parquet::WriterOptions>();
writeOption->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds /*micro*/;
auto compressionCodec = CompressionKind::CompressionKind_SNAPPY;
if (sparkConfs.find(kParquetCompressionCodec) != sparkConfs.end()) {
auto compressionCodecStr = sparkConfs.find(kParquetCompressionCodec)->second;
Expand All @@ -74,7 +67,7 @@ void VeloxParquetDataSource::init(const std::unordered_map<std::string, std::str
if (parquetGzipWindowSizeStr == kGzipWindowSize4k) {
auto codecOptions = std::make_shared<facebook::velox::parquet::arrow::util::GZipCodecOptions>();
codecOptions->window_bits = kGzipWindowBits4k;
writeOption.codecOptions = std::move(codecOptions);
writeOption->codecOptions = std::move(codecOptions);
}
}
} else if (boost::iequals(compressionCodecStr, "lzo")) {
Expand All @@ -92,15 +85,28 @@ void VeloxParquetDataSource::init(const std::unordered_map<std::string, std::str
compressionCodec = CompressionKind::CompressionKind_NONE;
}
}
writeOption.compressionKind = compressionCodec;
writeOption.flushPolicyFactory = [&]() {
writeOption->compressionKind = compressionCodec;
writeOption->flushPolicyFactory = [maxRowGroupRows, maxRowGroupBytes]() {
return std::make_unique<velox::parquet::LambdaFlushPolicy>(
maxRowGroupRows_, maxRowGroupBytes_, [&]() { return false; });
maxRowGroupRows, maxRowGroupBytes, [&]() { return false; });
};
writeOption.parquetWriteTimestampTimeZone = getConfigValue(sparkConfs, kSessionTimezone, std::nullopt);
auto schema = gluten::fromArrowSchema(schema_);
writeOption->parquetWriteTimestampTimeZone = getConfigValue(sparkConfs, kSessionTimezone, std::nullopt);
return writeOption;
}

void VeloxParquetDataSource::initSink(const std::unordered_map<std::string, std::string>& /* sparkConfs */) {
if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
} else {
throw std::runtime_error("The file path is not local when writing data with parquet format in velox runtime!");
}
}

parquetWriter_ = std::make_unique<velox::parquet::Writer>(std::move(sink_), writeOption, pool_, asRowType(schema));
void VeloxParquetDataSource::init(const std::unordered_map<std::string, std::string>& sparkConfs) {
initSink(sparkConfs);
auto schema = gluten::fromArrowSchema(schema_);
const auto writeOption = makeParquetWriteOption(sparkConfs);
parquetWriter_ = std::make_unique<velox::parquet::Writer>(std::move(sink_), *writeOption, pool_, asRowType(schema));
}

void VeloxParquetDataSource::inspectSchema(struct ArrowSchema* out) {
Expand Down
7 changes: 4 additions & 3 deletions cpp/velox/operators/writer/VeloxParquetDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "memory/VeloxColumnarBatch.h"
#include "operators/writer/VeloxDataSource.h"

#include "velox/common/compression/Compression.h"
#include "velox/common/file/FileSystems.h"
#ifdef ENABLE_S3
#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h"
Expand Down Expand Up @@ -88,6 +89,9 @@ class VeloxParquetDataSource : public VeloxDataSource {
std::shared_ptr<arrow::Schema> schema)
: VeloxDataSource(filePath, schema), filePath_(filePath), schema_(schema), pool_(std::move(veloxPool)) {}

static std::unique_ptr<facebook::velox::parquet::WriterOptions> makeParquetWriteOption(
const std::unordered_map<std::string, std::string>& sparkConfs);

void init(const std::unordered_map<std::string, std::string>& sparkConfs) override;
virtual void initSink(const std::unordered_map<std::string, std::string>& sparkConfs);
void inspectSchema(struct ArrowSchema* out) override;
Expand All @@ -103,9 +107,6 @@ class VeloxParquetDataSource : public VeloxDataSource {
std::unique_ptr<facebook::velox::dwio::common::FileSink> sink_;

private:
int64_t maxRowGroupBytes_ = 134217728; // 128MB
int64_t maxRowGroupRows_ = 100000000; // 100M

std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<facebook::velox::parquet::Writer> parquetWriter_;
std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
Expand Down
52 changes: 30 additions & 22 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@

#include "utils/ConfigExtractor.h"

#include "config.pb.h"
#include "config/GlutenConfig.h"
#include "config/VeloxConfig.h"
#include "operators/plannodes/RowVectorStream.h"
#include "operators/writer/VeloxParquetDataSource.h"

namespace gluten {
namespace {
Expand Down Expand Up @@ -533,6 +535,7 @@ std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandl
const std::vector<std::string>& partitionedBy,
const std::shared_ptr<connector::hive::HiveBucketProperty>& bucketProperty,
const std::shared_ptr<connector::hive::LocationHandle>& locationHandle,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions,
const dwio::common::FileFormat& tableStorageFormat = dwio::common::FileFormat::PARQUET,
const std::optional<common::CompressionKind>& compressionKind = {}) {
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>> columnHandles;
Expand Down Expand Up @@ -578,7 +581,13 @@ std::shared_ptr<connector::hive::HiveInsertTableHandle> makeHiveInsertTableHandl
VELOX_CHECK_EQ(numBucketColumns, bucketedBy.size());
VELOX_CHECK_EQ(numSortingColumns, sortedBy.size());
return std::make_shared<connector::hive::HiveInsertTableHandle>(
columnHandles, locationHandle, tableStorageFormat, bucketProperty, compressionKind);
columnHandles,
locationHandle,
tableStorageFormat,
bucketProperty,
compressionKind,
std::unordered_map<std::string, std::string>{},
writerOptions);
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::WriteRel& writeRel) {
Expand Down Expand Up @@ -646,30 +655,28 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
writePath = "";
}

// spark default compression code is snappy.
common::CompressionKind compressionCodec = common::CompressionKind::CompressionKind_SNAPPY;
if (writeRel.named_table().has_advanced_extension()) {
if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isSnappy=")) {
compressionCodec = common::CompressionKind::CompressionKind_SNAPPY;
} else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isGzip=")) {
compressionCodec = common::CompressionKind::CompressionKind_GZIP;
} else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isLzo=")) {
compressionCodec = common::CompressionKind::CompressionKind_LZO;
} else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isLz4=")) {
compressionCodec = common::CompressionKind::CompressionKind_LZ4;
} else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isZstd=")) {
compressionCodec = common::CompressionKind::CompressionKind_ZSTD;
} else if (SubstraitParser::configSetInOptimization(writeRel.named_table().advanced_extension(), "isNone=")) {
compressionCodec = common::CompressionKind::CompressionKind_NONE;
} else if (SubstraitParser::configSetInOptimization(
writeRel.named_table().advanced_extension(), "isUncompressed=")) {
compressionCodec = common::CompressionKind::CompressionKind_NONE;
}
GLUTEN_CHECK(writeRel.named_table().has_advanced_extension(), "Advanced extension not found in WriteRel");
const auto& ext = writeRel.named_table().advanced_extension();
GLUTEN_CHECK(ext.has_optimization(), "Extension optimization not found in WriteRel");
const auto& opt = ext.optimization();
gluten::ConfigMap confMap;
opt.UnpackTo(&confMap);
std::unordered_map<std::string, std::string> writeConfs;
for (const auto& item : *(confMap.mutable_configs())) {
writeConfs.emplace(item.first, item.second);
}

// Currently only support parquet format.
const std::string& formatShortName = writeConfs["format"];
GLUTEN_CHECK(formatShortName == "parquet", "Unsupported file write format: " + formatShortName);
dwio::common::FileFormat fileFormat = dwio::common::FileFormat::PARQUET;

const std::shared_ptr<facebook::velox::parquet::WriterOptions> writerOptions =
VeloxParquetDataSource::makeParquetWriteOption(writeConfs);
// Spark's default compression code is snappy.
const auto& compressionKind =
writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY);

return std::make_shared<core::TableWriteNode>(
nextPlanNodeId(),
inputType,
Expand All @@ -682,9 +689,10 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
inputType->children(),
partitionedKey,
bucketProperty,
makeLocationHandle(writePath, fileFormat, compressionCodec, bucketProperty != nullptr),
makeLocationHandle(writePath, fileFormat, compressionKind, bucketProperty != nullptr),
writerOptions,
fileFormat,
compressionCodec)),
compressionKind)),
(!partitionedKey.empty()),
exec::TableWriteTraits::outputType(nullptr),
connector::CommitStrategy::kNoCommit,
Expand Down
Loading