diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 84b3b26fde0a6..1f316144bbfce 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -277,6 +277,7 @@ void PrestoServer::run() { registerPrestoToVeloxConnector( std::make_unique("$system@system")); + velox::exec::OutputBufferManager::initialize({}); initializeVeloxMemory(); initializeThreadPools(); diff --git a/presto-native-execution/presto_cpp/main/QueryContextManager.cpp b/presto-native-execution/presto_cpp/main/QueryContextManager.cpp index e12cdbaec3f57..9797fa5205f24 100644 --- a/presto-native-execution/presto_cpp/main/QueryContextManager.cpp +++ b/presto-native-execution/presto_cpp/main/QueryContextManager.cpp @@ -224,6 +224,17 @@ QueryContextManager::toVeloxConfigs( traceFragmentId = it.second; } else if (it.first == SessionProperties::kQueryTraceShardId) { traceShardId = it.second; + } else if (it.first == SessionProperties::kShuffleCompressionEnabled) { + if (it.second == "true") { + // NOTE: Presto java only support lz4 compression so configure the same + // compression kind on velox. + configs[core::QueryConfig::kShuffleCompressionKind] = std::to_string( + static_cast(velox::common::CompressionKind_LZ4)); + } else { + VELOX_USER_CHECK_EQ(it.second, "false"); + configs[core::QueryConfig::kShuffleCompressionKind] = std::to_string( + static_cast(velox::common::CompressionKind_NONE)); + } } else { configs[sessionProperties_.toVeloxConfig(it.first)] = it.second; sessionProperties_.updateVeloxConfig(it.first, it.second); diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.h b/presto-native-execution/presto_cpp/main/SessionProperties.h index 8966892e025c4..6d77bb8026f33 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.h +++ b/presto-native-execution/presto_cpp/main/SessionProperties.h @@ -283,6 +283,10 @@ class SessionProperties { static constexpr const char* kPrefixSortMinRows = "native_prefixsort_min_rows"; + /// If true, enable the shuffle compression. + static constexpr const char* kShuffleCompressionEnabled = + "exchange_compression"; + SessionProperties(); const std::unordered_map>&