Skip to content

Commit

Permalink
Add PrestoSparkExecutionContext for capturing different runtine info …
Browse files Browse the repository at this point in the history
…for logging
  • Loading branch information
pgupta2 committed Oct 25, 2023
1 parent b6a639b commit e9adf8c
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
ImmutableList.of(),
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of()));
ImmutableSet.of(),
Optional.empty()));

logQueryTimeline(queryInfo);
}
Expand Down Expand Up @@ -270,7 +271,8 @@ public void queryCompletedEvent(QueryInfo queryInfo)
queryInfo.getCteInformationList(),
queryInfo.getScalarFunctions(),
queryInfo.getAggregateFunctions(),
queryInfo.getWindowsFunctions()));
queryInfo.getWindowsFunctions(),
queryInfo.getPrestoSparkExecutionContext()));

logQueryTimeline(queryInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.prestospark.PrestoSparkExecutionContext;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.sql.planner.CanonicalPlanWithInfo;
Expand Down Expand Up @@ -102,6 +103,7 @@ public class QueryInfo
// Using a list rather than map, to avoid implementing map key deserializer
private final List<CanonicalPlanWithInfo> planCanonicalInfo;
private Map<PlanNodeId, PlanNode> planIdNodeMap;
private final Optional<PrestoSparkExecutionContext> prestoSparkExecutionContext;

@JsonCreator
public QueryInfo(
Expand Down Expand Up @@ -146,7 +148,8 @@ public QueryInfo(
@JsonProperty("aggregateFunctions") Set<String> aggregateFunctions,
@JsonProperty("windowsFunctions") Set<String> windowsFunctions,
List<CanonicalPlanWithInfo> planCanonicalInfo,
Map<PlanNodeId, PlanNode> planIdNodeMap)
Map<PlanNodeId, PlanNode> planIdNodeMap,
@JsonProperty("prestoSparkExecutionContext") Optional<PrestoSparkExecutionContext> prestoSparkExecutionContext)
{
requireNonNull(queryId, "queryId is null");
requireNonNull(session, "session is null");
Expand Down Expand Up @@ -180,6 +183,7 @@ public QueryInfo(
requireNonNull(scalarFunctions, "scalarFunctions is null");
requireNonNull(aggregateFunctions, "aggregateFunctions is null");
requireNonNull(windowsFunctions, "windowsFunctions is null");
requireNonNull(prestoSparkExecutionContext, "prestoSparkExecutionContext is null");

this.queryId = queryId;
this.session = session;
Expand Down Expand Up @@ -228,6 +232,7 @@ public QueryInfo(
this.windowsFunctions = windowsFunctions;
this.planCanonicalInfo = planCanonicalInfo == null ? ImmutableList.of() : planCanonicalInfo;
this.planIdNodeMap = planIdNodeMap == null ? ImmutableMap.of() : ImmutableMap.copyOf(planIdNodeMap);
this.prestoSparkExecutionContext = prestoSparkExecutionContext;
}

@JsonProperty
Expand Down Expand Up @@ -486,6 +491,12 @@ public Set<String> getWindowsFunctions()
return windowsFunctions;
}

@JsonProperty
public Optional<PrestoSparkExecutionContext> getPrestoSparkExecutionContext()
{
return prestoSparkExecutionContext;
}

// Don't serialize this field because it can be big
public List<CanonicalPlanWithInfo> getPlanCanonicalInfo()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ public QueryInfo getQueryInfo(Optional<StageInfo> rootStage)
aggregateFunctions.get(),
windowsFunctions.get(),
Optional.ofNullable(planCanonicalInfo.get()).orElseGet(ImmutableList::of),
Optional.ofNullable(planIdNodeMap.get()).orElseGet(ImmutableMap::of));
Optional.ofNullable(planIdNodeMap.get()).orElseGet(ImmutableMap::of),
Optional.empty());
}

private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo> allStages)
Expand Down Expand Up @@ -1104,7 +1105,8 @@ public void pruneQueryInfo()
queryInfo.getAggregateFunctions(),
queryInfo.getWindowsFunctions(),
ImmutableList.of(),
ImmutableMap.of());
ImmutableMap.of(),
queryInfo.getPrestoSparkExecutionContext());
finalQueryInfo.compareAndSet(finalInfo, Optional.of(prunedQueryInfo));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.ValuesNode;
import com.facebook.presto.spi.prestospark.PrestoSparkExecutionContext;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.sql.Serialization;
Expand Down Expand Up @@ -122,6 +123,7 @@ public void testQueryInfoRoundTrip()
assertEquals(actual.getRemovedSessionFunctions(), expected.getRemovedSessionFunctions());
// Test that planCanonicalInfo is not serialized
assertEquals(actual.getPlanCanonicalInfo(), ImmutableList.of());
assertEquals(actual.getPrestoSparkExecutionContext(), expected.getPrestoSparkExecutionContext());
}

private static JsonCodec<QueryInfo> createJsonCodec()
Expand Down Expand Up @@ -201,6 +203,7 @@ private static QueryInfo createQueryInfo()
new ValuesNode(Optional.empty(), new PlanNodeId("0"), ImmutableList.of(), ImmutableList.of(), Optional.empty()),
PlanCanonicalizationStrategy.DEFAULT),
new PlanNodeCanonicalInfo("a", ImmutableList.of()))),
ImmutableMap.of());
ImmutableMap.of(),
Optional.of(PrestoSparkExecutionContext.create(1024, 300, true, false)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public void testConstructor()
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableList.of(),
ImmutableMap.of()));
ImmutableMap.of(),
Optional.empty()));

assertEquals(basicInfo.getQueryId().getId(), "0");
assertEquals(basicInfo.getState(), RUNNING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ private QueryInfo createQueryInfo(String queryId, ResourceGroupId resourceGroupI
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableList.of(),
ImmutableMap.of());
ImmutableMap.of(),
Optional.empty());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.prestospark.PhysicalResourceSettings;
import io.airlift.units.DataSize;

import java.util.OptionalInt;

import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getAverageInputDataSizePerExecutor;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getAverageInputDataSizePerPartition;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMaxExecutorCount;
Expand All @@ -38,6 +36,15 @@ public class PrestoSparkPhysicalResourceCalculator
{
private static final Logger log = Logger.get(PrestoSparkPhysicalResourceCalculator.class);

private final int defaultHashPartitionCount;
private final int defaultMaxExecutorCount;

public PrestoSparkPhysicalResourceCalculator(int defaultHashPartitionCount, int defaultMaxExecutorCount)
{
this.defaultHashPartitionCount = defaultHashPartitionCount;
this.defaultMaxExecutorCount = defaultMaxExecutorCount;
}

/**
* Calculates the final resource settings for the query. This takes into account all override values
* with the following precedence:
Expand All @@ -57,13 +64,10 @@ public class PrestoSparkPhysicalResourceCalculator
* or {@link PrestoSparkSettingsRequirements#SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG}
* </li>
* </ul>
*
*/
public PhysicalResourceSettings calculate(PlanNode plan, PrestoSparkSourceStatsCollector prestoSparkSourceStatsCollector, Session session)
{
int hashPartitionCount = getHashPartitionCount(session);
OptionalInt maxExecutorCount = OptionalInt.empty();
PhysicalResourceSettings defaultResourceSettings = new PhysicalResourceSettings(hashPartitionCount, maxExecutorCount);
PhysicalResourceSettings defaultResourceSettings = new PhysicalResourceSettings(defaultHashPartitionCount, defaultMaxExecutorCount, false, false);

if (!anyAllocationStrategyEnabled(session)) {
log.info(String.format("ResourceAllocationStrategy disabled. Executing query %s with %s", session.getQueryId(), defaultResourceSettings));
Expand All @@ -83,16 +87,23 @@ else if (Double.isNaN(inputDataInBytes)) {
}
DataSize inputSize = new DataSize(inputDataInBytes, BYTE);

int hashPartitionCount = defaultHashPartitionCount;
int maxExecutorCount = defaultMaxExecutorCount;
boolean isHashPartitionCountAutoTuned = false;
boolean isMaxExecutorCountAutoTuned = false;
// update hashPartitionCount only if resource allocation or hash partition allocation is enabled
if (isSparkResourceAllocationStrategyEnabled(session) || isSparkHashPartitionCountAllocationStrategyEnabled(session)) {
hashPartitionCount = calculateHashPartitionCount(session, inputSize);
isHashPartitionCountAutoTuned = true;
}

// update maxExecutorCount only if resource allocation or executor allocation is enabled
if (isSparkResourceAllocationStrategyEnabled(session) || isSparkExecutorAllocationStrategyEnabled(session)) {
maxExecutorCount = OptionalInt.of(calculateExecutorCount(session, inputSize));
maxExecutorCount = calculateExecutorCount(session, inputSize);
isMaxExecutorCountAutoTuned = true;
}
PhysicalResourceSettings finalResourceSettings = new PhysicalResourceSettings(hashPartitionCount, maxExecutorCount);

PhysicalResourceSettings finalResourceSettings = new PhysicalResourceSettings(hashPartitionCount, maxExecutorCount, isHashPartitionCountAutoTuned, isMaxExecutorCountAutoTuned);

log.info(String.format("Executing query %s with %s based on resource allocation strategy", session.getQueryId(), finalResourceSettings));
return finalResourceSettings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.prestospark.PrestoSparkExecutionContext;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.security.AccessControl;
Expand Down Expand Up @@ -336,6 +337,16 @@ public static QueryInfo createQueryInfo(
succinctBytes(peakNodeTotalMemoryInBytes),
session.getRuntimeStats());

Optional<PrestoSparkExecutionContext> prestoSparkExecutionContext = Optional.empty();
if (planAndMore.isPresent()) {
prestoSparkExecutionContext = Optional.of(
PrestoSparkExecutionContext.create(
planAndMore.get().getPhysicalResourceSettings().getHashPartitionCount(),
planAndMore.get().getPhysicalResourceSettings().getMaxExecutorCount(),
planAndMore.get().getPhysicalResourceSettings().isHashPartitionCountAutoTuned(),
planAndMore.get().getPhysicalResourceSettings().isMaxExecutorCountAutoTuned()));
}

return new QueryInfo(
session.getQueryId(),
session.toSessionRepresentation(),
Expand Down Expand Up @@ -378,7 +389,8 @@ public static QueryInfo createQueryInfo(
planAndMore.map(PlanAndMore::getInvokedAggregateFunctions).orElseGet(ImmutableSet::of),
planAndMore.map(PlanAndMore::getInvokedWindowFunctions).orElseGet(ImmutableSet::of),
planAndMore.map(PlanAndMore::getPlanCanonicalInfo).orElseGet(ImmutableList::of),
planAndMore.map(PlanAndMore::getPlan).map(Plan::getPlanIdNodeMap).orElseGet(ImmutableMap::of));
planAndMore.map(PlanAndMore::getPlan).map(Plan::getPlanIdNodeMap).orElseGet(ImmutableMap::of),
prestoSparkExecutionContext);
}

public static StageInfo createStageInfo(QueryId queryId, SubPlan plan, List<TaskInfo> taskInfos)
Expand Down Expand Up @@ -671,7 +683,7 @@ else if (preparedQuery.isExplainTypeValidate()) {
else {
VariableAllocator variableAllocator = new VariableAllocator();
PlanNodeIdAllocator planNodeIdAllocator = new PlanNodeIdAllocator();
planAndMore = queryPlanner.createQueryPlan(session, preparedQuery, warningCollector, variableAllocator, planNodeIdAllocator);
planAndMore = queryPlanner.createQueryPlan(session, preparedQuery, warningCollector, variableAllocator, planNodeIdAllocator, sparkContext);
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
CollectionAccumulator<SerializedTaskInfo> taskInfoCollector = new CollectionAccumulator<>();
taskInfoCollector.register(sparkContext, Option.empty(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,9 +1024,9 @@ private void tuneMaxExecutorsCount()
{
// Executor allocation is currently only supported at root level of the plan
// In future this could be extended to fragment level configuration
if (planAndMore.getPhysicalResourceSettings().getMaxExecutorCount().isPresent()) {
if (planAndMore.getPhysicalResourceSettings().isMaxExecutorCountAutoTuned()) {
sparkContext.sc().conf().set(SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG,
Integer.toString(planAndMore.getPhysicalResourceSettings().getMaxExecutorCount().getAsInt()));
Integer.toString(planAndMore.getPhysicalResourceSettings().getMaxExecutorCount()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.facebook.presto.execution.Input;
import com.facebook.presto.execution.Output;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spark.PhysicalResourceSettings;
import com.facebook.presto.spark.PrestoSparkPhysicalResourceCalculator;
import com.facebook.presto.spark.PrestoSparkSourceStatsCollector;
import com.facebook.presto.spi.VariableAllocator;
Expand All @@ -30,6 +29,7 @@
import com.facebook.presto.spi.plan.OutputNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.prestospark.PhysicalResourceSettings;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.sql.Optimizer;
import com.facebook.presto.sql.analyzer.Analysis;
Expand All @@ -47,6 +47,7 @@
import com.facebook.presto.sql.planner.sanity.PlanChecker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.spark.SparkContext;

import javax.inject.Inject;

Expand All @@ -56,9 +57,11 @@
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static com.facebook.presto.SystemSessionProperties.isLogInvokedFunctionNamesEnabled;
import static com.facebook.presto.common.RuntimeMetricName.LOGICAL_PLANNER_TIME_NANOS;
import static com.facebook.presto.common.RuntimeMetricName.OPTIMIZER_TIME_NANOS;
import static com.facebook.presto.spark.PrestoSparkSettingsRequirements.SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG;
import static com.facebook.presto.spi.function.FunctionKind.AGGREGATE;
import static com.facebook.presto.spi.function.FunctionKind.SCALAR;
import static com.facebook.presto.spi.function.FunctionKind.WINDOW;
Expand Down Expand Up @@ -103,7 +106,7 @@ public PrestoSparkQueryPlanner(
this.planCanonicalInfoProvider = requireNonNull(historyBasedPlanStatisticsManager, "historyBasedPlanStatisticsManager is null").getPlanCanonicalInfoProvider();
}

public PlanAndMore createQueryPlan(Session session, BuiltInPreparedQuery preparedQuery, WarningCollector warningCollector, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator)
public PlanAndMore createQueryPlan(Session session, BuiltInPreparedQuery preparedQuery, WarningCollector warningCollector, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, SparkContext sparkContext)
{
Analyzer analyzer = new Analyzer(
session,
Expand Down Expand Up @@ -149,7 +152,10 @@ public PlanAndMore createQueryPlan(Session session, BuiltInPreparedQuery prepare
Optional<Output> output = new OutputExtractor().extractOutput(plan.getRoot());
Optional<QueryType> queryType = getQueryType(preparedQuery.getStatement().getClass());
List<String> columnNames = ((OutputNode) plan.getRoot()).getColumnNames();
PhysicalResourceSettings physicalResourceSettings = new PrestoSparkPhysicalResourceCalculator()
PrestoSparkPhysicalResourceCalculator prestoSparkPhysicalResourceCalculator = new PrestoSparkPhysicalResourceCalculator(
getHashPartitionCount(session),
sparkContext.getConf().getInt(SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG, 0));
PhysicalResourceSettings physicalResourceSettings = prestoSparkPhysicalResourceCalculator
.calculate(plan.getRoot(), new PrestoSparkSourceStatsCollector(metadata, session), session);
Map<FunctionKind, Set<String>> functionsInvoked = Collections.emptyMap();
if (isLogInvokedFunctionNamesEnabled(session)) {
Expand Down
Loading

0 comments on commit e9adf8c

Please sign in to comment.