Skip to content

Commit

Permalink
[Native] Trim native prefix for system session property
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 committed Oct 4, 2023
1 parent ee631de commit 5babfc1
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,14 @@ public final class SystemSessionProperties
public static final String INFER_INEQUALITY_PREDICATES = "infer_inequality_predicates";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "simplified_expression_evaluation_enabled";
public static final String NATIVE_AGGREGATION_SPILL_MEMORY_THRESHOLD = "aggregation_spill_memory_threshold";
public static final String NATIVE_JOIN_SPILL_MEMORY_THRESHOLD = "join_spill_memory_threshold";
public static final String NATIVE_ORDER_BY_SPILL_MEMORY_THRESHOLD = "order_by_spill_memory_threshold";
public static final String NATIVE_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "native_simplified_expression_evaluation_enabled";
public static final String NATIVE_AGGREGATION_SPILL_MEMORY_THRESHOLD = "native_aggregation_spill_memory_threshold";
public static final String NATIVE_JOIN_SPILL_MEMORY_THRESHOLD = "native_join_spill_memory_threshold";
public static final String NATIVE_ORDER_BY_SPILL_MEMORY_THRESHOLD = "native_order_by_spill_memory_threshold";
public static final String NATIVE_MAX_SPILL_LEVEL = "native_max_spill_level";
public static final String NATIVE_SPILL_COMPRESSION_CODEC = "native_spill_compression_codec";
public static final String NATIVE_SPILL_WRITE_BUFFER_SIZE = "native_spill_write_buffer_size";
public static final String NATIVE_JOIN_SPILL_ENABLED = "native_join_spill_enabled";
public static final String NATIVE_EXECUTION_ENABLED = "native_execution_enabled";
public static final String NATIVE_EXECUTION_EXECUTABLE_PATH = "native_execution_executable_path";
public static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments";
Expand Down Expand Up @@ -1498,6 +1502,29 @@ public SystemSessionProperties(
"Native Execution only. The max memory that order by can use before spilling. If it is 0, then there is no limit",
0,
false),
integerProperty(
NATIVE_MAX_SPILL_LEVEL,
"Native Execution only. The maximum allowed spilling level for hash join build.\n" +
"0 is the initial spilling level, -1 means unlimited.",
4,
false),
stringProperty(
NATIVE_SPILL_COMPRESSION_CODEC,
"Native Execution only. The compression algorithm type to compress the spilled data.\n " +
"Supported compression codecs are: ZLIB, SNAPPY, LZO, ZSTD, LZ4 and GZIP. NONE means no compression.",
"none",
false),
longProperty(
NATIVE_SPILL_WRITE_BUFFER_SIZE,
"Native Execution only. The maximum size in bytes to buffer the serialized spill data before writing to disk for IO efficiency.\n" +
"If set to zero, buffering is disabled.",
1024L * 1024L,
false),
booleanProperty(
NATIVE_JOIN_SPILL_ENABLED,
"Native Execution only. Enable join spilling on native engine",
false,
false),
booleanProperty(
NATIVE_EXECUTION_ENABLED,
"Enable execution on native engine",
Expand Down Expand Up @@ -2305,6 +2332,7 @@ private static Integer validateIntegerValue(Object value, String property, int l
}
return intValue;
}

private static Double validateDoubleValueWithinSelectivityRange(Object value, String property)
{
Double number = (Double) value;
Expand Down Expand Up @@ -2742,6 +2770,7 @@ public static double getRandomizeOuterJoinNullKeyNullRatioThreshold(Session sess
{
return session.getSystemProperty(RANDOMIZE_OUTER_JOIN_NULL_KEY_NULL_RATIO_THRESHOLD, Double.class);
}

public static ShardedJoinStrategy getShardedJoinStrategy(Session session)
{
return session.getSystemProperty(SHARDED_JOINS_STRATEGY, ShardedJoinStrategy.class);
Expand All @@ -2751,6 +2780,7 @@ public static int getJoinShardCount(Session session)
{
return session.getSystemProperty(JOIN_SHARD_COUNT, Integer.class);
}

public static boolean isOptimizeConditionalAggregationEnabled(Session session)
{
return session.getSystemProperty(OPTIMIZE_CONDITIONAL_AGGREGATION_ENABLED, Boolean.class);
Expand Down
12 changes: 11 additions & 1 deletion presto-native-execution/presto_cpp/main/QueryContextManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,23 @@ folly::IOThreadPoolExecutor* spillExecutorPtr() {
}

namespace {
std::string maybeRemoveNativePrefix(const std::string& name) {
static const std::string kNativePrefix = "native_";
const auto result =
::strncmp(name.c_str(), kNativePrefix.c_str(), kNativePrefix.size());
if (result == 0) {
return name.substr(kNativePrefix.length());
}
return name;
}

std::unordered_map<std::string, std::string> toConfigs(
const protocol::SessionRepresentation& session) {
// Use base velox query config as the starting point and add Presto session
// properties on top of it.
auto configs = BaseVeloxQueryConfig::instance()->values();
for (const auto& it : session.systemProperties) {
configs[it.first] = it.second;
configs[maybeRemoveNativePrefix(it.first)] = it.second;
}

// If there's a timeZoneKey, convert to timezone name and add to the
Expand Down
3 changes: 2 additions & 1 deletion presto-native-execution/presto_cpp/main/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ add_executable(
PrestoTaskTest.cpp
QueryContextCacheTest.cpp
ServerOperationTest.cpp
TaskManagerTest.cpp)
TaskManagerTest.cpp
QueryContextManagerTest.cpp)

add_test(
NAME presto_server_test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/QueryContextManager.h"
#include <gtest/gtest.h>
#include "presto_cpp/main/TaskManager.h"

namespace facebook::presto {

class QueryContextManagerTest : public testing::Test {
protected:
void SetUp() override {
taskManager_ = std::make_unique<TaskManager>();
}
std::unique_ptr<TaskManager> taskManager_;
};

TEST_F(QueryContextManagerTest, nativeSessionProperties) {
protocol::TaskId taskId = "scan.0.0.1.0";
protocol::SessionRepresentation session{
.systemProperties = {
{"native_max_spill_level", "1"},
{"native_spill_compression_codec", "NONE"},
{"native_join_spill_enabled", "true"},
{"native_spill_write_buffer_size", "1024"},
{"aggregation_spill_all", "true"}}};
auto queryCtx = taskManager_->getQueryContextManager()->findOrCreateQueryCtx(
taskId, session);
EXPECT_EQ(queryCtx->queryConfig().maxSpillLevel(), 1);
EXPECT_EQ(queryCtx->queryConfig().spillCompressionKind(), "NONE");
EXPECT_TRUE(queryCtx->queryConfig().joinSpillEnabled());
EXPECT_EQ(queryCtx->queryConfig().spillWriteBufferSize(), 1024);
EXPECT_TRUE(queryCtx->queryConfig().aggregationSpillAll());
}

} // namespace facebook::presto

0 comments on commit 5babfc1

Please sign in to comment.