Skip to content

Commit

Permalink
Add support for user provided execution strategy
Browse files Browse the repository at this point in the history
Adds support for specifying execution strategy using
session property - spark_execution_strategies
  • Loading branch information
singcha committed Nov 8, 2023
1 parent 02889cd commit 9a2e540
Show file tree
Hide file tree
Showing 14 changed files with 245 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spark;

import com.facebook.presto.spark.classloader_interface.ExecutionStrategy;
import com.facebook.presto.spi.PrestoException;

import java.util.Arrays;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static com.facebook.presto.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static java.lang.String.format;

public class ExecutionStrategyValidator
implements Consumer<String>
{
private static final Set<String> validStrategies = Arrays.stream(ExecutionStrategy.values())
.map(strategy -> strategy.name())
.collect(Collectors.toSet());

@Override
public void accept(String strategy)
{
if (!validStrategies.contains(strategy)) {
throw new PrestoException(INVALID_SESSION_PROPERTY, format("Invalid value for execution strategy [%s]. Valid values: %s", strategy, String.join(",", validStrategies)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

import static java.util.Objects.requireNonNull;

public class PrestoSparkRetryExecutionSettings
public class PrestoSparkExecutionSettings
{
private final Map<String, String> sparkConfigProperties;
private final Map<String, String> prestoSessionProperties;

public PrestoSparkRetryExecutionSettings(
public PrestoSparkExecutionSettings(
Map<String, String> sparkConfigProperties,
Map<String, String> prestoSessionProperties)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.facebook.presto.spark.accesscontrol.PrestoSparkAccessControlChecker;
import com.facebook.presto.spark.accesscontrol.PrestoSparkAuthenticatorProvider;
import com.facebook.presto.spark.accesscontrol.PrestoSparkCredentialsProvider;
import com.facebook.presto.spark.classloader_interface.ExecutionStrategy;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkQueryExecution;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkQueryExecutionFactory;
import com.facebook.presto.spark.classloader_interface.PrestoSparkConfInitializer;
Expand All @@ -65,7 +66,6 @@
import com.facebook.presto.spark.classloader_interface.PrestoSparkSession;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskExecutorFactoryProvider;
import com.facebook.presto.spark.classloader_interface.RetryExecutionStrategy;
import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo;
import com.facebook.presto.spark.execution.PrestoSparkAdaptiveQueryExecution;
import com.facebook.presto.spark.execution.PrestoSparkDataDefinitionExecution;
Expand Down Expand Up @@ -143,8 +143,8 @@
import static com.facebook.presto.server.protocol.QueryResourceUtil.toStatementStats;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isAdaptiveQueryExecutionEnabled;
import static com.facebook.presto.spark.SparkErrorCode.MALFORMED_QUERY_FILE;
import static com.facebook.presto.spark.util.PrestoSparkExecutionUtils.getExecutionSettings;
import static com.facebook.presto.spark.util.PrestoSparkFailureUtils.toPrestoSparkFailure;
import static com.facebook.presto.spark.util.PrestoSparkRetryExecutionUtils.getRetryExecutionSettings;
import static com.facebook.presto.spark.util.PrestoSparkUtils.createPagesSerde;
import static com.facebook.presto.spark.util.PrestoSparkUtils.getActionResultWithTimeout;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
Expand Down Expand Up @@ -556,7 +556,7 @@ public IPrestoSparkQueryExecution create(
PrestoSparkTaskExecutorFactoryProvider executorFactoryProvider,
Optional<String> queryStatusInfoOutputLocation,
Optional<String> queryDataOutputLocation,
List<RetryExecutionStrategy> retryExecutionStrategies,
List<ExecutionStrategy> executionStrategies,
Optional<CollectionAccumulator<Map<String, Long>>> bootstrapMetricsCollector)
{
PrestoSparkConfInitializer.checkInitialized(sparkContext);
Expand Down Expand Up @@ -619,19 +619,19 @@ public IPrestoSparkQueryExecution create(
Session session = sessionSupplier.createSession(queryId, sessionContext, warningCollectorFactory, authorizedIdentity);
session = sessionPropertyDefaults.newSessionWithDefaultProperties(session, Optional.empty(), Optional.empty());

if (!retryExecutionStrategies.isEmpty()) {
log.info("Going to retry with following strategies: %s", retryExecutionStrategies);
PrestoSparkRetryExecutionSettings prestoSparkRetryExecutionSettings = getRetryExecutionSettings(retryExecutionStrategies, session);
if (!executionStrategies.isEmpty()) {
log.info("Going to run with following strategies: %s", executionStrategies);
PrestoSparkExecutionSettings prestoSparkExecutionSettings = getExecutionSettings(executionStrategies, session);

// Update Spark setting in SparkConf, if present
prestoSparkRetryExecutionSettings.getSparkConfigProperties().forEach(sparkContext.conf()::set);
prestoSparkExecutionSettings.getSparkConfigProperties().forEach(sparkContext.conf()::set);

// Update Presto settings in Session, if present
Session.SessionBuilder sessionBuilder = Session.builder(session);
transferSessionPropertiesToSession(sessionBuilder, prestoSparkRetryExecutionSettings.getPrestoSessionProperties());
transferSessionPropertiesToSession(sessionBuilder, prestoSparkExecutionSettings.getPrestoSessionProperties());

Set<String> clientTags = new HashSet<>(session.getClientTags());
retryExecutionStrategies.forEach(s -> clientTags.add(s.name()));
executionStrategies.forEach(s -> clientTags.add(s.name()));
sessionBuilder.setClientTags(clientTags);

session = sessionBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
import static com.google.common.base.Strings.nullToEmpty;
import static java.util.Collections.emptyList;

public class PrestoSparkSessionProperties
{
Expand All @@ -48,6 +49,7 @@ public class PrestoSparkSessionProperties
public static final String SPARK_SPLIT_ASSIGNMENT_BATCH_SIZE = "spark_split_assignment_batch_size";
public static final String SPARK_MEMORY_REVOKING_THRESHOLD = "spark_memory_revoking_threshold";
public static final String SPARK_MEMORY_REVOKING_TARGET = "spark_memory_revoking_target";
public static final String SPARK_QUERY_EXECUTION_STRATEGIES = "spark_query_execution_strategies";
public static final String SPARK_RETRY_ON_OUT_OF_MEMORY_BROADCAST_JOIN_ENABLED = "spark_retry_on_out_of_memory_broadcast_join_enabled";
public static final String SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED = "spark_retry_on_out_of_memory_with_increased_memory_settings_enabled";
public static final String OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES = "out_of_memory_retry_presto_session_properties";
Expand All @@ -69,6 +71,7 @@ public class PrestoSparkSessionProperties
public static final String NATIVE_TRIGGER_COREDUMP_WHEN_UNRESPONSIVE_ENABLED = "native_trigger_coredump_when_unresponsive_enabled";

private final List<PropertyMetadata<?>> sessionProperties;
private final ExecutionStrategyValidator executionStrategyValidator;

public PrestoSparkSessionProperties()
{
Expand All @@ -78,6 +81,7 @@ public PrestoSparkSessionProperties()
@Inject
public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig)
{
executionStrategyValidator = new ExecutionStrategyValidator();
sessionProperties = ImmutableList.of(
booleanProperty(
SPARK_PARTITION_COUNT_AUTO_TUNE_ENABLED,
Expand Down Expand Up @@ -139,6 +143,19 @@ public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig)
"When revoking memory, try to revoke so much that memory pool is filled below target at the end",
prestoSparkConfig.getMemoryRevokingTarget(),
false),
new PropertyMetadata<>(
SPARK_QUERY_EXECUTION_STRATEGIES,
"Execution strategies to be applied while running the query",
VARCHAR,
List.class,
emptyList(),
false,
value -> {
List<String> specifiedStrategies = Splitter.on(',').trimResults().omitEmptyStrings().splitToList(value.toString());
specifiedStrategies.forEach(strategy -> executionStrategyValidator.accept(strategy));
return specifiedStrategies;
},
value -> value),
booleanProperty(
SPARK_RETRY_ON_OUT_OF_MEMORY_BROADCAST_JOIN_ENABLED,
"Disable broadcast join on broadcast OOM and re-submit the query again within the same spark session",
Expand Down Expand Up @@ -309,6 +326,11 @@ public static double getMemoryRevokingTarget(Session session)
return session.getSystemProperty(SPARK_MEMORY_REVOKING_TARGET, Double.class);
}

public static List<String> getQueryExecutionStrategies(Session session)
{
return session.getSystemProperty(SPARK_QUERY_EXECUTION_STRATEGIES, List.class);
}

public static boolean isRetryOnOutOfMemoryBroadcastJoinEnabled(Session session)
{
return session.getSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_BROADCAST_JOIN_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.spark.PrestoSparkRetryExecutionSettings;
import com.facebook.presto.spark.PrestoSparkExecutionSettings;
import com.facebook.presto.spark.PrestoSparkSessionContext;
import com.facebook.presto.spark.classloader_interface.RetryExecutionStrategy;
import com.facebook.presto.spark.classloader_interface.ExecutionStrategy;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.google.common.collect.ImmutableMap;
Expand All @@ -32,21 +32,21 @@
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getOutOfMemoryRetrySparkConfigs;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_RETRY_EXECUTION_STRATEGY;

public class PrestoSparkRetryExecutionUtils
public class PrestoSparkExecutionUtils
{
private static final Logger log = Logger.get(PrestoSparkSessionContext.class);

private PrestoSparkRetryExecutionUtils() {}
private PrestoSparkExecutionUtils() {}

public static PrestoSparkRetryExecutionSettings getRetryExecutionSettings(
List<RetryExecutionStrategy> retryExecutionStrategies,
public static PrestoSparkExecutionSettings getExecutionSettings(
List<ExecutionStrategy> executionStrategies,
Session session)
{
ImmutableMap.Builder<String, String> sparkConfigProperties = new ImmutableMap.Builder<>();
ImmutableMap.Builder<String, String> prestoSessionProperties = new ImmutableMap.Builder<>();

for (RetryExecutionStrategy strategy : retryExecutionStrategies) {
log.info(String.format("Applying retry execution strategy: %s. Query Id: %s", strategy.name(), session.getQueryId().getId()));
for (ExecutionStrategy strategy : executionStrategies) {
log.info(String.format("Applying execution strategy: %s. Query Id: %s", strategy.name(), session.getQueryId().getId()));
switch (strategy) {
case DISABLE_BROADCAST_JOIN:
prestoSessionProperties.put(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.PARTITIONED.name());
Expand All @@ -63,10 +63,10 @@ public static PrestoSparkRetryExecutionSettings getRetryExecutionSettings(
prestoSessionProperties.put(HASH_PARTITION_COUNT, Long.toString(updatedPartitionCount));
break;
default:
throw new PrestoException(INVALID_RETRY_EXECUTION_STRATEGY, "Retry execution strategy not supported: " + retryExecutionStrategies);
throw new PrestoException(INVALID_RETRY_EXECUTION_STRATEGY, "Execution strategy not supported: " + executionStrategies);
}
}

return new PrestoSparkRetryExecutionSettings(sparkConfigProperties.build(), prestoSessionProperties.build());
return new PrestoSparkExecutionSettings(sparkConfigProperties.build(), prestoSessionProperties.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import com.facebook.presto.Session;
import com.facebook.presto.common.ErrorCode;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.spark.classloader_interface.ExecutionStrategy;
import com.facebook.presto.spark.classloader_interface.PrestoSparkFailure;
import com.facebook.presto.spark.classloader_interface.RetryExecutionStrategy;
import com.facebook.presto.spi.ErrorCause;
import com.google.common.collect.ImmutableList;

Expand All @@ -30,9 +30,9 @@
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isRetryOnOutOfMemoryWithHigherHashPartitionCountEnabled;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isRetryOnOutOfMemoryWithIncreasedMemoryEnabled;
import static com.facebook.presto.spark.SparkErrorCode.SPARK_EXECUTOR_OOM;
import static com.facebook.presto.spark.classloader_interface.RetryExecutionStrategy.DISABLE_BROADCAST_JOIN;
import static com.facebook.presto.spark.classloader_interface.RetryExecutionStrategy.INCREASE_CONTAINER_SIZE;
import static com.facebook.presto.spark.classloader_interface.RetryExecutionStrategy.INCREASE_HASH_PARTITION_COUNT;
import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.DISABLE_BROADCAST_JOIN;
import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.INCREASE_CONTAINER_SIZE;
import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.INCREASE_HASH_PARTITION_COUNT;
import static com.facebook.presto.spi.ErrorCause.LOW_PARTITION_COUNT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_BROADCAST_JOIN_MEMORY_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT;
Expand All @@ -49,7 +49,7 @@ public static PrestoSparkFailure toPrestoSparkFailure(Session session, Execution
PrestoSparkFailure prestoSparkFailure = toPrestoSparkFailure(executionFailureInfo);
checkState(prestoSparkFailure != null);

List<RetryExecutionStrategy> retryExecutionStrategies = getRetryExecutionStrategies(session,
List<ExecutionStrategy> retryExecutionStrategies = getRetryExecutionStrategies(session,
executionFailureInfo.getErrorCode(),
executionFailureInfo.getMessage(),
executionFailureInfo.getErrorCause());
Expand Down Expand Up @@ -90,13 +90,13 @@ private static PrestoSparkFailure toPrestoSparkFailure(ExecutionFailureInfo exec
/**
* Returns a list of retry strategies based on the provided error.
*/
private static List<RetryExecutionStrategy> getRetryExecutionStrategies(Session session, ErrorCode errorCode, String message, ErrorCause errorCause)
private static List<ExecutionStrategy> getRetryExecutionStrategies(Session session, ErrorCode errorCode, String message, ErrorCause errorCause)
{
if (errorCode == null || message == null) {
return ImmutableList.of();
}

ImmutableList.Builder<RetryExecutionStrategy> strategies = new ImmutableList.Builder<>();
ImmutableList.Builder<ExecutionStrategy> strategies = new ImmutableList.Builder<>();

if (isRetryOnOutOfMemoryBroadcastJoinEnabled(session) &&
errorCode.equals(EXCEEDED_LOCAL_BROADCAST_JOIN_MEMORY_LIMIT.toErrorCode())) {
Expand Down
Loading

0 comments on commit 9a2e540

Please sign in to comment.