Skip to content

Commit

Permalink
Fix logging of plan optimizer
Browse files Browse the repository at this point in the history
Change the result type of PlanOptimizer, so that it not only returns the new plan node, but also returns whether the optimizer is triggered or not.
  • Loading branch information
feilong-liu committed Oct 26, 2023
1 parent 9bce03a commit 82a3bc8
Show file tree
Hide file tree
Showing 42 changed files with 433 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ private Optional<PlanFragment> performRuntimeOptimizations(StreamingSubPlan subP
PlanFragment fragment = subPlan.getFragment();
PlanNode newRoot = fragment.getRoot();
for (PlanOptimizer optimizer : runtimePlanOptimizers) {
newRoot = optimizer.optimize(newRoot, session, TypeProvider.viewOf(variableAllocator.getVariables()), variableAllocator, idAllocator, warningCollector);
newRoot = optimizer.optimize(newRoot, session, TypeProvider.viewOf(variableAllocator.getVariables()), variableAllocator, idAllocator, warningCollector).getPlanNode();
}
if (newRoot != fragment.getRoot()) {
StatsAndCosts estimatedStatsAndCosts = fragment.getStatsAndCosts();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ private Optional<PlanFragment> performRuntimeOptimizations(StreamingSubPlan subP
PlanFragment fragment = subPlan.getFragment();
PlanNode newRoot = fragment.getRoot();
for (PlanOptimizer optimizer : runtimePlanOptimizers) {
newRoot = optimizer.optimize(newRoot, session, TypeProvider.viewOf(variableAllocator.getVariables()), variableAllocator, idAllocator, warningCollector);
newRoot = optimizer.optimize(newRoot, session, TypeProvider.viewOf(variableAllocator.getVariables()), variableAllocator, idAllocator, warningCollector).getPlanNode();
}
if (newRoot != fragment.getRoot()) {
StatsAndCosts estimatedStatsAndCosts = fragment.getStatsAndCosts();
Expand Down
15 changes: 8 additions & 7 deletions presto-main/src/main/java/com/facebook/presto/sql/Optimizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.facebook.presto.sql.planner.iterative.IterativeOptimizer;
import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher;
import com.facebook.presto.sql.planner.optimizations.PlanOptimizer;
import com.facebook.presto.sql.planner.optimizations.PlanOptimizerResult;
import com.facebook.presto.sql.planner.optimizations.StatsRecordingPlanOptimizer;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
Expand Down Expand Up @@ -113,8 +114,8 @@ public Plan validateAndOptimizePlan(PlanNode root, PlanStage stage)
throw new PrestoException(QUERY_PLANNING_TIMEOUT, String.format("The query optimizer exceeded the timeout of %s.", getQueryAnalyzerTimeout(session).toString()));
}
long start = System.nanoTime();
PlanNode newRoot = optimizer.optimize(root, session, TypeProvider.viewOf(variableAllocator.getVariables()), variableAllocator, idAllocator, warningCollector);
requireNonNull(newRoot, format("%s returned a null plan", optimizer.getClass().getName()));
PlanOptimizerResult optimizerResult = optimizer.optimize(root, session, TypeProvider.viewOf(variableAllocator.getVariables()), variableAllocator, idAllocator, warningCollector);
requireNonNull(optimizerResult, format("%s returned a null plan", optimizer.getClass().getName()));
if (enableVerboseRuntimeStats || trackOptimizerRuntime(session, optimizer)) {
String optimizerName = optimizer.getClass().getSimpleName();
if (optimizer instanceof StatsRecordingPlanOptimizer) {
Expand All @@ -124,8 +125,8 @@ public Plan validateAndOptimizePlan(PlanNode root, PlanStage stage)
}
TypeProvider types = TypeProvider.viewOf(variableAllocator.getVariables());

collectOptimizerInformation(optimizer, root, newRoot, types);
root = newRoot;
collectOptimizerInformation(optimizer, root, optimizerResult, types);
root = optimizerResult.getPlanNode();
}
}

Expand Down Expand Up @@ -164,15 +165,15 @@ private StatsAndCosts computeStats(PlanNode root, TypeProvider types)
return StatsAndCosts.empty();
}

private void collectOptimizerInformation(PlanOptimizer optimizer, PlanNode oldNode, PlanNode newNode, TypeProvider types)
private void collectOptimizerInformation(PlanOptimizer optimizer, PlanNode oldNode, PlanOptimizerResult planOptimizerResult, TypeProvider types)
{
if (optimizer instanceof IterativeOptimizer) {
// iterative optimizers do their own recording of what rules got triggered
return;
}

String optimizerName = optimizer.getClass().getSimpleName();
boolean isTriggered = (oldNode != newNode);
boolean isTriggered = planOptimizerResult.isOptimizerTriggered();
boolean isApplicable =
isTriggered ||
!optimizer.isEnabled(session) && isVerboseOptimizerInfoEnabled(session) &&
Expand All @@ -187,7 +188,7 @@ private void collectOptimizerInformation(PlanOptimizer optimizer, PlanNode oldNo

if (isTriggered && isVerboseOptimizerResults(session, optimizerName)) {
String oldNodeStr = PlannerUtils.getPlanString(oldNode, session, types, metadata, false);
String newNodeStr = PlannerUtils.getPlanString(newNode, session, types, metadata, false);
String newNodeStr = PlannerUtils.getPlanString(planOptimizerResult.getPlanNode(), session, types, metadata, false);
session.getOptimizerResultCollector().addOptimizerResult(optimizerName, oldNodeStr, newNodeStr);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.facebook.presto.sql.planner.RuleStatsRecorder;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.optimizations.PlanOptimizer;
import com.facebook.presto.sql.planner.optimizations.PlanOptimizerResult;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -98,15 +99,18 @@ public IterativeOptimizer(Metadata metadata, RuleStatsRecorder stats, StatsCalcu
}

@Override
public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
// only disable new rules if we have legacy rules to fall back to
if (!SystemSessionProperties.isNewOptimizerEnabled(session) && !legacyRules.isEmpty()) {
boolean planChanged = false;
for (PlanOptimizer optimizer : legacyRules) {
plan = optimizer.optimize(plan, session, TypeProvider.viewOf(variableAllocator.getVariables()), variableAllocator, idAllocator, warningCollector);
PlanOptimizerResult planOptimizerResult = optimizer.optimize(plan, session, TypeProvider.viewOf(variableAllocator.getVariables()), variableAllocator, idAllocator, warningCollector);
plan = planOptimizerResult.getPlanNode();
planChanged = planChanged || planOptimizerResult.isOptimizerTriggered();
}

return plan;
return PlanOptimizerResult.optimizerResult(plan, planChanged);
}

Memo memo;
Expand All @@ -132,10 +136,10 @@ public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, Var
boolean planChanged = exploreGroup(memo.getRootGroup(), context, matcher);
context.collectOptimizerInformation();
if (!planChanged) {
return plan;
return PlanOptimizerResult.optimizerResult(plan, false);
}

return memo.extract();
return PlanOptimizerResult.optimizerResult(memo.extract(), true);
}

private boolean exploreGroup(int group, Context context, Matcher matcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.optimizations.PlanOptimizer;
import com.facebook.presto.sql.planner.optimizations.PlanOptimizerResult;
import com.facebook.presto.sql.planner.plan.AbstractJoinNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.JoinNode;
Expand Down Expand Up @@ -74,21 +75,29 @@ public RemoveUnsupportedDynamicFilters(FunctionAndTypeManager functionAndTypeMan
}

@Override
public PlanNode optimize(
public PlanOptimizerResult optimize(
PlanNode plan,
Session session,
TypeProvider types,
VariableAllocator variableAllocator,
PlanNodeIdAllocator idAllocator,
WarningCollector warningCollector)
{
PlanWithConsumedDynamicFilters result = plan.accept(new RemoveUnsupportedDynamicFilters.Rewriter(), ImmutableSet.of());
return result.getNode();
Rewriter rewriter = new RemoveUnsupportedDynamicFilters.Rewriter();
PlanWithConsumedDynamicFilters result = plan.accept(rewriter, ImmutableSet.of());
return PlanOptimizerResult.optimizerResult(result.getNode(), rewriter.isPlanChanged());
}

private class Rewriter
extends InternalPlanVisitor<PlanWithConsumedDynamicFilters, Set<String>>
{
boolean planChanged;

public boolean isPlanChanged()
{
return planChanged;
}

@Override
public PlanWithConsumedDynamicFilters visitPlan(PlanNode node, Set<String> allowedDynamicFilterIds)
{
Expand Down Expand Up @@ -198,6 +207,7 @@ public PlanWithConsumedDynamicFilters visitFilter(FilterNode node, Set<String> a
RowExpression modified;
if (source instanceof TableScanNode) {
// Keep only allowed dynamic filters
planChanged = true;
modified = removeDynamicFilters(original, allowedDynamicFilterIds, consumedDynamicFilterIds);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ public AddExchanges(Metadata metadata, SqlParser parser, PartitioningProviderMan
}

@Override
public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager).accept(plan, PreferredProperties.any());
return result.getNode();
boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent();
return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered);
}

private class Rewriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ public AddLocalExchanges(Metadata metadata, SqlParser parser)
}

@Override
public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
PlanWithProperties result = new Rewriter(variableAllocator, idAllocator, session).accept(plan, any());
return result.getNode();
boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isLocal()).findFirst().isPresent();
return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered);
}

private class Rewriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public ApplyConnectorOptimization(Supplier<Map<ConnectorId, Set<ConnectorPlanOpt
}

@Override
public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
requireNonNull(plan, "plan is null");
requireNonNull(session, "session is null");
Expand All @@ -89,7 +89,7 @@ public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, Var

Map<ConnectorId, Set<ConnectorPlanOptimizer>> connectorOptimizers = connectorOptimizersSupplier.get();
if (connectorOptimizers.isEmpty()) {
return plan;
return PlanOptimizerResult.optimizerResult(plan, false);
}

// retrieve all the connectors
Expand Down Expand Up @@ -173,7 +173,7 @@ public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, Var
}
}

return plan;
return PlanOptimizerResult.optimizerResult(plan, true);
}

private static void getAllConnectorIds(PlanNode node, ImmutableSet.Builder<ConnectorId> builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class CheckSubqueryNodesAreRewritten
implements PlanOptimizer
{
@Override
public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
searchFrom(plan).where(ApplyNode.class::isInstance)
.findFirst()
Expand All @@ -52,7 +52,7 @@ public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, Var
error(lateralJoinNode.getCorrelation(), lateralJoinNode.getOriginSubqueryError());
});

return plan;
return PlanOptimizerResult.optimizerResult(plan, false);
}

private void error(List<VariableReferenceExpression> correlation, String originSubqueryError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public boolean isEnabled(Session session)
}

@Override
public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
requireNonNull(plan, "plan is null");
requireNonNull(session, "session is null");
Expand All @@ -122,9 +122,9 @@ public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, Var
requireNonNull(idAllocator, "idAllocator is null");
if (isEnabled(session)) {
PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, functionAndTypeManager).accept(plan, new HashComputationSet());
return result.getNode();
return PlanOptimizerResult.optimizerResult(result.getNode(), true);
}
return plan;
return PlanOptimizerResult.optimizerResult(plan, false);
}

private static class Rewriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public boolean isEnabled(Session session)
}

@Override
public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
requireNonNull(plan, "plan is null");
requireNonNull(session, "session is null");
Expand All @@ -83,13 +83,13 @@ public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, Var
requireNonNull(idAllocator, "idAllocator is null");

if (!isEnabled(session)) {
return plan;
return PlanOptimizerResult.optimizerResult(plan, false);
}

// Only enable history based optimization when plan has a join/aggregation.
if (restrictHistoryBasedOptimizationToComplexQuery(session) &&
!PlanNodeSearcher.searchFrom(plan).where(node -> PRECOMPUTE_PLAN_NODES.stream().anyMatch(clazz -> clazz.isInstance(node))).matches()) {
return plan;
return PlanOptimizerResult.optimizerResult(plan, false);
}

long startTimeInNano = System.nanoTime();
Expand All @@ -108,25 +108,25 @@ public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, Var
.sum();

if (historiesLimitingPlanNodeLimit > getHistoryCanonicalPlanNodeLimit(session)) {
return plan;
return PlanOptimizerResult.optimizerResult(plan, false);
}

// Assign 'statsEquivalentPlanNode' to plan nodes
PlanNode newPlan = SimplePlanRewriter.rewriteWith(new Rewriter(idAllocator), plan, new Context());
// Return original plan if timeout
if (checkTimeOut(startTimeInNano, timeoutInMilliseconds)) {
logOptimizerFailure(session);
return plan;
return PlanOptimizerResult.optimizerResult(plan, false);
}

// Fetch and cache history based statistics of all plan nodes, so no serial network calls happen later.
boolean registerSucceed = statsCalculator.registerPlan(newPlan, session, startTimeInNano, timeoutInMilliseconds);
// Return original plan if timeout or registration not successful
if (checkTimeOut(startTimeInNano, timeoutInMilliseconds) || !registerSucceed) {
logOptimizerFailure(session);
return plan;
return PlanOptimizerResult.optimizerResult(plan, false);
}
return newPlan;
return PlanOptimizerResult.optimizerResult(newPlan, true);
}

private boolean checkTimeOut(long startTimeInNano, long timeoutInMilliseconds)
Expand Down
Loading

0 comments on commit 82a3bc8

Please sign in to comment.