Skip to content

Commit

Permalink
Fix latency for HBO optimizer
Browse files Browse the repository at this point in the history
1. Check timeout for plan canonicalization, plan hash and return empty when timeout
2. Add a flag to specify whether to fetch HBO related data from cache only. It's false when register the query, and
true when getting statistics
3. Add verbose run time stats for plan hash, plan canonicalization and meta data read
  • Loading branch information
feilong-liu committed Nov 10, 2023
1 parent dae8a28 commit 8815076
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ private RuntimeMetricName()
// Size of the data retrieved by read call to storage
public static final String STORAGE_READ_DATA_BYTES = "storageReadDataBytes";
public static final String WRITTEN_FILES_COUNT = "writtenFilesCount";
public static final String HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_PLAN_NODE_HASHES = "historyOptimizerQueryRegistrationGetPlanNodeHashes";
public static final String HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_STATISTICS = "historyOptimizerQueryRegistrationGetStatistics";
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@

import static com.facebook.presto.SystemSessionProperties.getHistoryBasedOptimizerTimeoutLimit;
import static com.facebook.presto.SystemSessionProperties.getHistoryInputTableStatisticsMatchingThreshold;
import static com.facebook.presto.SystemSessionProperties.isVerboseRuntimeStatsEnabled;
import static com.facebook.presto.SystemSessionProperties.useHistoryBasedPlanStatisticsEnabled;
import static com.facebook.presto.common.RuntimeMetricName.HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_PLAN_NODE_HASHES;
import static com.facebook.presto.common.RuntimeMetricName.HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_STATISTICS;
import static com.facebook.presto.common.RuntimeUnit.NANO;
import static com.facebook.presto.common.plan.PlanCanonicalizationStrategy.historyBasedPlanCanonicalizationStrategyList;
import static com.facebook.presto.cost.HistoricalPlanStatisticsUtil.getPredictedPlanStatistics;
import static com.facebook.presto.sql.planner.iterative.Plans.resolveGroupReferences;
Expand Down Expand Up @@ -85,17 +89,31 @@ public boolean registerPlan(PlanNode root, Session session, long startTimeInNano
}
ImmutableList.Builder<PlanNodeWithHash> planNodesWithHash = ImmutableList.builder();
Iterable<PlanNode> planNodeIterable = forTree(PlanNode::getSources).depthFirstPreOrder(root);
boolean enableVerboseRuntimeStats = isVerboseRuntimeStatsEnabled(session);
long profileStartTime = 0;
for (PlanNode plan : planNodeIterable) {
if (checkTimeOut(startTimeInNano, timeoutInMilliseconds)) {
historyBasedStatisticsCacheManager.setHistoryBasedQueryRegistrationTimeout(session.getQueryId());
return false;
}
if (plan.getStatsEquivalentPlanNode().isPresent()) {
planNodesWithHash.addAll(getPlanNodeHashes(plan, session).values());
if (enableVerboseRuntimeStats) {
profileStartTime = System.nanoTime();
}
planNodesWithHash.addAll(getPlanNodeHashes(plan, session, false).values());
if (enableVerboseRuntimeStats) {
session.getRuntimeStats().addMetricValue(HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_PLAN_NODE_HASHES, NANO, System.nanoTime() - profileStartTime);
}
}
}
try {
if (enableVerboseRuntimeStats) {
profileStartTime = System.nanoTime();
}
historyBasedStatisticsCacheManager.getStatisticsCache(session.getQueryId(), historyBasedPlanStatisticsProvider, getHistoryBasedOptimizerTimeoutLimit(session).toMillis()).getAll(planNodesWithHash.build());
if (enableVerboseRuntimeStats) {
session.getRuntimeStats().addMetricValue(HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_STATISTICS, NANO, System.nanoTime() - profileStartTime);
}
}
catch (ExecutionException e) {
throw new RuntimeException("Unable to register plan: ", e.getCause());
Expand Down Expand Up @@ -132,7 +150,7 @@ public Supplier<HistoryBasedPlanStatisticsProvider> getHistoryBasedPlanStatistic
return historyBasedPlanStatisticsProvider;
}

private Map<PlanCanonicalizationStrategy, PlanNodeWithHash> getPlanNodeHashes(PlanNode plan, Session session)
private Map<PlanCanonicalizationStrategy, PlanNodeWithHash> getPlanNodeHashes(PlanNode plan, Session session, boolean cacheOnly)
{
if (!useHistoryBasedPlanStatisticsEnabled(session) || !plan.getStatsEquivalentPlanNode().isPresent()) {
return ImmutableMap.of();
Expand All @@ -142,8 +160,10 @@ private Map<PlanCanonicalizationStrategy, PlanNodeWithHash> getPlanNodeHashes(Pl
ImmutableMap.Builder<PlanCanonicalizationStrategy, PlanNodeWithHash> allHashesBuilder = ImmutableMap.builder();

for (PlanCanonicalizationStrategy strategy : historyBasedPlanCanonicalizationStrategyList()) {
Optional<String> hash = planCanonicalInfoProvider.hash(session, statsEquivalentPlanNode, strategy);
allHashesBuilder.put(strategy, new PlanNodeWithHash(statsEquivalentPlanNode, hash));
Optional<String> hash = planCanonicalInfoProvider.hash(session, statsEquivalentPlanNode, strategy, cacheOnly);
if (hash.isPresent()) {
allHashesBuilder.put(strategy, new PlanNodeWithHash(statsEquivalentPlanNode, hash));
}
}

return allHashesBuilder.build();
Expand All @@ -156,7 +176,7 @@ private PlanNodeStatsEstimate getStatistics(PlanNode planNode, Session session,
}

PlanNode plan = resolveGroupReferences(planNode, lookup);
Map<PlanCanonicalizationStrategy, PlanNodeWithHash> allHashes = getPlanNodeHashes(plan, session);
Map<PlanCanonicalizationStrategy, PlanNodeWithHash> allHashes = getPlanNodeHashes(plan, session, true);

Map<PlanNodeWithHash, HistoricalPlanStatistics> statistics = ImmutableMap.of();
try {
Expand All @@ -172,7 +192,7 @@ private PlanNodeStatsEstimate getStatistics(PlanNode planNode, Session session,
for (PlanCanonicalizationStrategy strategy : historyBasedPlanCanonicalizationStrategyList()) {
for (Map.Entry<PlanNodeWithHash, HistoricalPlanStatistics> entry : statistics.entrySet()) {
if (allHashes.containsKey(strategy) && entry.getKey().getHash().isPresent() && allHashes.get(strategy).equals(entry.getKey())) {
Optional<List<PlanStatistics>> inputTableStatistics = getPlanNodeInputTableStatistics(plan, session);
Optional<List<PlanStatistics>> inputTableStatistics = getPlanNodeInputTableStatistics(plan, session, true);
if (inputTableStatistics.isPresent()) {
PlanStatistics predictedPlanStatistics = getPredictedPlanStatistics(entry.getValue(), inputTableStatistics.get(), historyMatchingThreshold);
if (predictedPlanStatistics.getConfidence() > 0) {
Expand All @@ -188,13 +208,13 @@ private PlanNodeStatsEstimate getStatistics(PlanNode planNode, Session session,
return delegateStats;
}

private Optional<List<PlanStatistics>> getPlanNodeInputTableStatistics(PlanNode plan, Session session)
private Optional<List<PlanStatistics>> getPlanNodeInputTableStatistics(PlanNode plan, Session session, boolean cacheOnly)
{
if (!useHistoryBasedPlanStatisticsEnabled(session) || !plan.getStatsEquivalentPlanNode().isPresent()) {
return Optional.empty();
}

PlanNode statsEquivalentPlanNode = plan.getStatsEquivalentPlanNode().get();
return planCanonicalInfoProvider.getInputTableStatistics(session, statsEquivalentPlanNode);
return planCanonicalInfoProvider.getInputTableStatistics(session, statsEquivalentPlanNode, cacheOnly);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.sql.planner;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.plan.PlanCanonicalizationStrategy;
import com.facebook.presto.cost.HistoryBasedStatisticsCacheManager;
import com.facebook.presto.metadata.Metadata;
Expand All @@ -36,6 +37,7 @@
import java.util.Optional;

import static com.facebook.presto.SystemSessionProperties.getHistoryBasedOptimizerTimeoutLimit;
import static com.facebook.presto.common.RuntimeUnit.NANO;
import static com.facebook.presto.common.plan.PlanCanonicalizationStrategy.historyBasedPlanCanonicalizationStrategyList;
import static com.google.common.hash.Hashing.sha256;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand All @@ -57,42 +59,68 @@ public CachingPlanCanonicalInfoProvider(HistoryBasedStatisticsCacheManager histo
}

@Override
public Optional<String> hash(Session session, PlanNode planNode, PlanCanonicalizationStrategy strategy)
public Optional<String> hash(Session session, PlanNode planNode, PlanCanonicalizationStrategy strategy, boolean cacheOnly)
{
CacheKey key = new CacheKey(planNode, strategy);
return loadValue(session, key).map(PlanNodeCanonicalInfo::getHash);
return loadValue(session, key, cacheOnly).map(PlanNodeCanonicalInfo::getHash);
}

@Override
public Optional<List<PlanStatistics>> getInputTableStatistics(Session session, PlanNode planNode)
public Optional<List<PlanStatistics>> getInputTableStatistics(Session session, PlanNode planNode, boolean cacheOnly)
{
CacheKey key = new CacheKey(planNode, historyBasedPlanCanonicalizationStrategyList().get(0));
return loadValue(session, key).map(PlanNodeCanonicalInfo::getInputTableStatistics);
return loadValue(session, key, cacheOnly).map(PlanNodeCanonicalInfo::getInputTableStatistics);
}

private Optional<PlanNodeCanonicalInfo> loadValue(Session session, CacheKey key)
private Optional<PlanNodeCanonicalInfo> loadValue(Session session, CacheKey key, boolean cacheOnly)
{
long startTimeInNano = System.nanoTime();
long profileStartTime = 0;
long timeoutInMilliseconds = getHistoryBasedOptimizerTimeoutLimit(session).toMillis();
boolean enableVerboseRuntimeStats = SystemSessionProperties.isVerboseRuntimeStatsEnabled(session);
Map<CacheKey, PlanNodeCanonicalInfo> cache = historyBasedStatisticsCacheManager.getCanonicalInfoCache(session.getQueryId());
PlanNodeCanonicalInfo result = cache.get(key);
if (result != null) {
return Optional.of(result);
if (result != null || cacheOnly) {
return Optional.ofNullable(result);
}
CanonicalPlanGenerator.Context context = new CanonicalPlanGenerator.Context();
if (enableVerboseRuntimeStats) {
profileStartTime = System.nanoTime();
}
key.getNode().accept(new CanonicalPlanGenerator(key.getStrategy(), objectMapper, session), context);
if (enableVerboseRuntimeStats) {
profileTime("CanonicalPlanGenerator", profileStartTime, session);
}
if (loadValueTimeout(startTimeInNano, timeoutInMilliseconds)) {
return Optional.empty();
}
for (Map.Entry<PlanNode, CanonicalPlan> entry : context.getCanonicalPlans().entrySet()) {
CanonicalPlan canonicalPlan = entry.getValue();
PlanNode plan = entry.getKey();
if (enableVerboseRuntimeStats) {
profileStartTime = System.nanoTime();
}
String hashValue = hashCanonicalPlan(canonicalPlan, objectMapper);
if (enableVerboseRuntimeStats) {
profileTime("HashCanonicalPlan", profileStartTime, session);
}
if (loadValueTimeout(startTimeInNano, timeoutInMilliseconds)) {
return Optional.empty();
}
// Compute input table statistics for the plan node. This is useful in history based optimizations,
// where historical plan statistics are reused if input tables are similar in size across runs.
ImmutableList.Builder<PlanStatistics> inputTableStatisticsBuilder = ImmutableList.builder();
if (enableVerboseRuntimeStats) {
profileStartTime = System.nanoTime();
}
for (TableScanNode scanNode : context.getInputTables().get(plan)) {
if (loadValueTimeout(startTimeInNano, timeoutInMilliseconds)) {
break;
return Optional.empty();
}
inputTableStatisticsBuilder.add(getPlanStatisticsForTable(session, scanNode));
inputTableStatisticsBuilder.add(getPlanStatisticsForTable(session, scanNode, enableVerboseRuntimeStats));
}
if (enableVerboseRuntimeStats) {
profileTime("GetPlanStatisticsForTable", profileStartTime, session);
}
cache.put(new CacheKey(plan, key.getStrategy()), new PlanNodeCanonicalInfo(hashValue, inputTableStatisticsBuilder.build()));
}
Expand All @@ -107,7 +135,12 @@ private boolean loadValueTimeout(long startTimeInNano, long timeoutInMillisecond
return NANOSECONDS.toMillis(System.nanoTime() - startTimeInNano) > timeoutInMilliseconds;
}

private PlanStatistics getPlanStatisticsForTable(Session session, TableScanNode table)
private void profileTime(String name, long startProfileTime, Session session)
{
session.getRuntimeStats().addMetricValue(String.format("CachingPlanCanonicalInfoProvider:%s", name), NANO, System.nanoTime() - startProfileTime);
}

private PlanStatistics getPlanStatisticsForTable(Session session, TableScanNode table, boolean profileRuntime)
{
InputTableCacheKey key = new InputTableCacheKey(new TableHandle(
table.getTable().getConnectorId(),
Expand All @@ -119,7 +152,14 @@ private PlanStatistics getPlanStatisticsForTable(Session session, TableScanNode
if (planStatistics != null) {
return planStatistics;
}
long startProfileTime = 0;
if (profileRuntime) {
startProfileTime = System.nanoTime();
}
TableStatistics tableStatistics = metadata.getTableStatistics(session, key.getTableHandle(), key.getColumnHandles(), key.getConstraint());
if (profileRuntime) {
profileTime("ReadFromMetaData", startProfileTime, session);
}
planStatistics = new PlanStatistics(tableStatistics.getRowCount(), tableStatistics.getTotalSize(), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty());
cache.put(key, planStatistics);
return planStatistics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ public interface PlanCanonicalInfoProvider
* @param session Session for query being run
* @param planNode Plan node to hash
* @param strategy Strategy to canonicalize the plan node
* @param cacheOnly Only fetch from cache, and return Optional.empty() if set to true and no entry found in cache
* @return Hash of the plan node. Returns Optional.empty() if unable to hash.
*/
Optional<String> hash(Session session, PlanNode planNode, PlanCanonicalizationStrategy strategy);
Optional<String> hash(Session session, PlanNode planNode, PlanCanonicalizationStrategy strategy, boolean cacheOnly);

/**
* Canonicalize the plan, and return statistics of input tables. Output order is consistent with
* plan canonicalization.
* @param session Session for query being run
* @param planNode Plan node to hash
* @param cacheOnly Only fetch from cache, and return Optional.empty() if set to true and no entry found in cache
* @return Statistics of leaf input tables to plan node, ordered by a consistent canonicalization strategy.
*/
Optional<List<PlanStatistics>> getInputTableStatistics(Session session, PlanNode planNode);
Optional<List<PlanStatistics>> getInputTableStatistics(Session session, PlanNode planNode, boolean cacheOnly);
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public static List<CanonicalPlanWithInfo> getCanonicalInfo(
continue;
}
PlanNode statsEquivalentPlanNode = node.getStatsEquivalentPlanNode().get();
Optional<String> hash = planCanonicalInfoProvider.hash(session, statsEquivalentPlanNode, strategy);
Optional<List<PlanStatistics>> inputTableStatistics = planCanonicalInfoProvider.getInputTableStatistics(session, statsEquivalentPlanNode);
Optional<String> hash = planCanonicalInfoProvider.hash(session, statsEquivalentPlanNode, strategy, true);
Optional<List<PlanStatistics>> inputTableStatistics = planCanonicalInfoProvider.getInputTableStatistics(session, statsEquivalentPlanNode, true);
if (hash.isPresent() && inputTableStatistics.isPresent()) {
result.add(new CanonicalPlanWithInfo(new CanonicalPlan(statsEquivalentPlanNode, strategy), new PlanNodeCanonicalInfo(hash.get(), inputTableStatistics.get())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.google.common.graph.Traverser.forTree;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestCachingPlanCanonicalInfoProvider
Expand Down Expand Up @@ -72,13 +73,25 @@ public void testCache()
return;
}
for (PlanCanonicalizationStrategy strategy : historyBasedPlanCanonicalizationStrategyList()) {
planCanonicalInfoProvider.hash(session, child.getStatsEquivalentPlanNode().get(), strategy).get();
planCanonicalInfoProvider.hash(session, child.getStatsEquivalentPlanNode().get(), strategy, false).get();
}
});
// Assert that size of cache remains same, meaning all needed hashes were already cached.
assertEquals(planCanonicalInfoProvider.getCacheSize(), 5L * historyBasedPlanCanonicalizationStrategyList().size());
planCanonicalInfoProvider.getHistoryBasedStatisticsCacheManager().invalidate(session.getQueryId());
assertEquals(planCanonicalInfoProvider.getCacheSize(), 0);

forTree(PlanNode::getSources).depthFirstPreOrder(root).forEach(child -> {
if (!child.getStatsEquivalentPlanNode().isPresent()) {
return;
}
for (PlanCanonicalizationStrategy strategy : historyBasedPlanCanonicalizationStrategyList()) {
// Only read from cache, hence will return Optional.empty() as the cache is already invalidated
assertFalse(planCanonicalInfoProvider.hash(session, child.getStatsEquivalentPlanNode().get(), strategy, true).isPresent());
}
});
// Assert that cache is not populated as we only read from cache without populating with cache miss
assertEquals(planCanonicalInfoProvider.getCacheSize(), 0);
}

private Session createSession()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void testUsesHboStatsWhenMatchRuntime()
.registerVariable(planBuilder.variable("c1"))
.filter(planBuilder.rowExpression("c1 IS NOT NULL"),
planBuilder.values(planBuilder.variable("c1")));
Optional<String> hash = historyBasedPlanStatisticsCalculator.getPlanCanonicalInfoProvider().hash(session, statsEquivalentRemoteSource, REMOVE_SAFE_CONSTANTS);
Optional<String> hash = historyBasedPlanStatisticsCalculator.getPlanCanonicalInfoProvider().hash(session, statsEquivalentRemoteSource, REMOVE_SAFE_CONSTANTS, false);

InMemoryHistoryBasedPlanStatisticsProvider historyBasedPlanStatisticsProvider = (InMemoryHistoryBasedPlanStatisticsProvider) historyBasedPlanStatisticsCalculator.getHistoryBasedPlanStatisticsProvider().get();
historyBasedPlanStatisticsProvider.putStats(ImmutableMap.of(
Expand Down

0 comments on commit 8815076

Please sign in to comment.