Skip to content

Commit

Permalink
Add an option to not enforce input partition for join build
Browse files Browse the repository at this point in the history
  • Loading branch information
feilong-liu committed Dec 16, 2024
1 parent 33d7c03 commit f1e1fe5
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ public final class SystemSessionProperties
private static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments";
public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled";
public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding";
public static final String NATIVE_ENFORCE_JOIN_BUILD_INPUT_PARTITION = "native_enforce_join_build_input_partition";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -1543,6 +1544,11 @@ public SystemSessionProperties(
"Enable reuse the native process within the same JVM",
true,
false),
booleanProperty(
NATIVE_ENFORCE_JOIN_BUILD_INPUT_PARTITION,
"Enforce that the join build input is partitioned on join key",
featuresConfig.isNativeEnforceJoinBuildInputPartition(),
false),
booleanProperty(
RANDOMIZE_OUTER_JOIN_NULL_KEY,
"(Deprecated) Randomize null join key for outer join",
Expand Down Expand Up @@ -2888,6 +2894,11 @@ public static boolean isNativeExecutionProcessReuseEnabled(Session session)
return session.getSystemProperty(NATIVE_EXECUTION_PROCESS_REUSE_ENABLED, Boolean.class);
}

public static boolean isNativeJoinBuildPartitionEnforced(Session session)
{
return session.getSystemProperty(NATIVE_ENFORCE_JOIN_BUILD_INPUT_PARTITION, Boolean.class);
}

public static RandomizeOuterJoinNullKeyStrategy getRandomizeOuterJoinNullKeyStrategy(Session session)
{
// If RANDOMIZE_OUTER_JOIN_NULL_KEY is set to true, return always enabled, otherwise get strategy from RANDOMIZE_OUTER_JOIN_NULL_KEY_STRATEGY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public class FeaturesConfig
private String nativeExecutionExecutablePath = "./presto_server";
private String nativeExecutionProgramArguments = "";
private boolean nativeExecutionProcessReuseEnabled = true;
private boolean nativeEnforceJoinBuildInputPartition = true;
private boolean randomizeOuterJoinNullKey;
private RandomizeOuterJoinNullKeyStrategy randomizeOuterJoinNullKeyStrategy = RandomizeOuterJoinNullKeyStrategy.DISABLED;
private ShardedJoinStrategy shardedJoinStrategy = ShardedJoinStrategy.DISABLED;
Expand Down Expand Up @@ -2318,6 +2319,19 @@ public boolean isNativeExecutionProcessReuseEnabled()
return this.nativeExecutionProcessReuseEnabled;
}

@Config("native-enforce-join-build-input-partition")
@ConfigDescription("Enforce that the join build input is partitioned on join key")
public FeaturesConfig setNativeEnforceJoinBuildInputPartition(boolean nativeEnforceJoinBuildInputPartition)
{
this.nativeEnforceJoinBuildInputPartition = nativeEnforceJoinBuildInputPartition;
return this;
}

public boolean isNativeEnforceJoinBuildInputPartition()
{
return this.nativeEnforceJoinBuildInputPartition;
}

public boolean isRandomizeOuterJoinNullKeyEnabled()
{
return randomizeOuterJoinNullKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static com.facebook.presto.SystemSessionProperties.isDistributedSortEnabled;
import static com.facebook.presto.SystemSessionProperties.isEnforceFixedDistributionForOutputOperator;
import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled;
import static com.facebook.presto.SystemSessionProperties.isNativeJoinBuildPartitionEnforced;
import static com.facebook.presto.SystemSessionProperties.isQuickDistinctLimitEnabled;
import static com.facebook.presto.SystemSessionProperties.isSegmentedAggregationEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
Expand Down Expand Up @@ -831,7 +832,12 @@ public PlanWithProperties visitJoin(JoinNode node, StreamPreferredProperties par
.collect(toImmutableList());
StreamPreferredProperties buildPreference;
if (getTaskConcurrency(session) > 1) {
buildPreference = exactlyPartitionedOn(buildHashVariables);
if (nativeExecution && !isNativeJoinBuildPartitionEnforced(session)) {
buildPreference = defaultParallelism(session);
}
else {
buildPreference = exactlyPartitionedOn(buildHashVariables);
}
}
else {
buildPreference = singleStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import static com.facebook.presto.SystemSessionProperties.getTaskConcurrency;
import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled;
import static com.facebook.presto.SystemSessionProperties.isNativeJoinBuildPartitionEnforced;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static com.facebook.presto.sql.planner.optimizations.StreamPreferredProperties.defaultParallelism;
Expand Down Expand Up @@ -89,7 +90,12 @@ public Void visitJoin(JoinNode node, Void context)
.collect(toImmutableList());
StreamPreferredProperties requiredBuildProperty;
if (getTaskConcurrency(session) > 1) {
requiredBuildProperty = exactlyPartitionedOn(buildJoinVariables);
if (nativeExecutionEnabled && !isNativeJoinBuildPartitionEnforced(session)) {
requiredBuildProperty = defaultParallelism(session);
}
else {
requiredBuildProperty = exactlyPartitionedOn(buildJoinVariables);
}
}
else {
requiredBuildProperty = singleStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public void testDefaults()
.setNativeExecutionExecutablePath("./presto_server")
.setNativeExecutionProgramArguments("")
.setNativeExecutionProcessReuseEnabled(true)
.setNativeEnforceJoinBuildInputPartition(true)
.setRandomizeOuterJoinNullKeyEnabled(false)
.setRandomizeOuterJoinNullKeyStrategy(RandomizeOuterJoinNullKeyStrategy.DISABLED)
.setShardedJoinStrategy(FeaturesConfig.ShardedJoinStrategy.DISABLED)
Expand Down Expand Up @@ -400,6 +401,7 @@ public void testExplicitPropertyMappings()
.put("native-execution-executable-path", "/bin/echo")
.put("native-execution-program-arguments", "--v 1")
.put("native-execution-process-reuse-enabled", "false")
.put("native-enforce-join-build-input-partition", "false")
.put("optimizer.randomize-outer-join-null-key", "true")
.put("optimizer.randomize-outer-join-null-key-strategy", "key_from_outer_join")
.put("optimizer.sharded-join-strategy", "cost_based")
Expand Down Expand Up @@ -595,6 +597,7 @@ public void testExplicitPropertyMappings()
.setNativeExecutionExecutablePath("/bin/echo")
.setNativeExecutionProgramArguments("--v 1")
.setNativeExecutionProcessReuseEnabled(false)
.setNativeEnforceJoinBuildInputPartition(false)
.setRandomizeOuterJoinNullKeyEnabled(true)
.setRandomizeOuterJoinNullKeyStrategy(RandomizeOuterJoinNullKeyStrategy.KEY_FROM_OUTER_JOIN)
.setShardedJoinStrategy(FeaturesConfig.ShardedJoinStrategy.COST_BASED)
Expand Down

0 comments on commit f1e1fe5

Please sign in to comment.