Skip to content

Commit

Permalink
Move Delete TableWriter TableFinish node to SPI
Browse files Browse the repository at this point in the history
  • Loading branch information
feilong-liu committed Nov 19, 2024
1 parent 1b88d3b commit 0df90af
Show file tree
Hide file tree
Showing 59 changed files with 400 additions and 401 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.PlanNodeWithHash;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
Expand All @@ -42,7 +43,6 @@
import com.facebook.presto.spi.statistics.TableWriterNodeStatistics;
import com.facebook.presto.sql.planner.CanonicalPlan;
import com.facebook.presto.sql.planner.PlanNodeCanonicalInfo;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.facebook.presto.sql.planner.planPrinter.PlanNodeStats;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.plan.DeleteNode;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SemiJoinNode;
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher;
import com.facebook.presto.sql.planner.plan.DeleteNode;
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
import com.facebook.presto.sql.planner.plan.TableFinishNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.VerifyException;
Expand Down Expand Up @@ -90,7 +89,7 @@ public static TableWriteInfo createTableWriteInfo(PlanNode planNode, Metadata me
private static Optional<ExecutionWriterTarget> createWriterTarget(Optional<TableFinishNode> finishNodeOptional, Metadata metadata, Session session)
{
if (finishNodeOptional.isPresent()) {
WriterTarget target = finishNodeOptional.get().getTarget().orElseThrow(() -> new VerifyException("target is absent"));
TableWriterNode.WriterTarget target = finishNodeOptional.get().getTarget().orElseThrow(() -> new VerifyException("target is absent"));
if (target instanceof TableWriterNode.CreateName) {
TableWriterNode.CreateName create = (TableWriterNode.CreateName) target;
return Optional.of(new ExecutionWriterTarget.CreateHandle(metadata.beginCreateTable(session, create.getConnectorId().getCatalogName(), create.getTableMetadata(), create.getLayout()), create.getSchemaTableName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.MaterializedViewDefinition;
import com.facebook.presto.spi.NewTableLayout;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.TableHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.MaterializedViewDefinition;
import com.facebook.presto.spi.NewTableLayout;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.SystemTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.MaterializedViewDefinition;
import com.facebook.presto.spi.MaterializedViewStatus;
import com.facebook.presto.spi.NewTableLayout;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.SchemaTableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.StatisticAggregationsDescriptor;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.sql.planner.plan.StatisticAggregationsDescriptor;
import com.google.common.collect.ImmutableList;

import java.util.Collection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.StatisticAggregationsDescriptor;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.sql.planner.plan.StatisticAggregationsDescriptor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
Expand Down Expand Up @@ -469,9 +469,9 @@ public long getFinalRowCount()
{
checkState(uncommittedRecoverableLifespanAndStageStates.isEmpty(), "All recoverable LifespanAndStage should be committed when fetching final row count");
return Streams.concat(
noCommitUnrecoverableLifespanAndStageStates.values().stream(),
taskCommitUnrecoverableLifespanAndStageStates.values().stream(),
committedRecoverableLifespanAndStages.values().stream())
noCommitUnrecoverableLifespanAndStageStates.values().stream(),
taskCommitUnrecoverableLifespanAndStageStates.values().stream(),
committedRecoverableLifespanAndStages.values().stream())
.mapToLong(LifespanAndStageState::getRowCount)
.sum();
}
Expand All @@ -480,9 +480,9 @@ public List<Slice> getFinalFragments()
{
checkState(uncommittedRecoverableLifespanAndStageStates.isEmpty(), "All recoverable LifespanAndStage should be committed when fetching final fragments");
return Streams.concat(
noCommitUnrecoverableLifespanAndStageStates.values().stream(),
taskCommitUnrecoverableLifespanAndStageStates.values().stream(),
committedRecoverableLifespanAndStages.values().stream())
noCommitUnrecoverableLifespanAndStageStates.values().stream(),
taskCommitUnrecoverableLifespanAndStageStates.values().stream(),
committedRecoverableLifespanAndStages.values().stream())
.map(LifespanAndStageState::getFragments)
.flatMap(List::stream)
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,44 @@
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.NewTableLayout;
import com.facebook.presto.metadata.PartitioningMetadata;
import com.facebook.presto.metadata.TableLayout;
import com.facebook.presto.metadata.TableLayoutResult;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.NewTableLayout;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SourceLocation;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.TableMetadata;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.function.AggregationFunctionImplementation;
import com.facebook.presto.spi.function.FunctionHandle;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.Assignments;
import com.facebook.presto.spi.plan.Partitioning;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.plan.PartitioningScheme;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.StatisticAggregations;
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.relation.CallExpression;
import com.facebook.presto.spi.relation.ConstantExpression;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
import com.facebook.presto.sql.planner.BasePlanFragmenter;
import com.facebook.presto.sql.planner.StatisticsAggregationPlanner;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.StatisticAggregations;
import com.facebook.presto.sql.planner.plan.TableFinishNode;
import com.facebook.presto.sql.planner.plan.TableWriterMergeNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

Expand Down Expand Up @@ -318,14 +323,14 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
TableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle);
TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadataForWrite(session, catalogName, tableMetadata.getMetadata());
StatisticsAggregationPlanner.TableStatisticAggregation statisticsResult = statisticsAggregationPlanner.createStatisticsAggregation(statisticsMetadata, columnNameToVariable);
StatisticAggregations.Parts aggregations = statisticsResult.getAggregations().splitIntoPartialAndFinal(variableAllocator, metadata.getFunctionAndTypeManager());
StatisticAggregations.Parts aggregations = splitIntoPartialAndFinal(statisticsResult.getAggregations(), variableAllocator, metadata.getFunctionAndTypeManager());
PlanNode tableWriterMerge;

// Disabled by default. Enable when the column statistics are essential for future runtime adaptive plan optimizations
boolean enableStatsCollectionForTemporaryTable = SystemSessionProperties.isEnableStatsCollectionForTemporaryTable(session);

if (isTableWriterMergeOperatorEnabled(session)) {
StatisticAggregations.Parts localAggregations = aggregations.getPartialAggregation().splitIntoPartialAndIntermediate(variableAllocator, metadata.getFunctionAndTypeManager());
StatisticAggregations.Parts localAggregations = splitIntoPartialAndIntermediate(aggregations.getPartialAggregation(), variableAllocator, metadata.getFunctionAndTypeManager());
tableWriterMerge = new TableWriterMergeNode(
sourceLocation,
idAllocator.getNextId(),
Expand Down Expand Up @@ -384,4 +389,59 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
enableStatsCollectionForTemporaryTable ? Optional.of(aggregations.getFinalAggregation()) : Optional.empty(),
enableStatsCollectionForTemporaryTable ? Optional.of(statisticsResult.getDescriptor()) : Optional.empty());
}

public static StatisticAggregations.Parts splitIntoPartialAndFinal(StatisticAggregations statisticAggregations, VariableAllocator variableAllocator, FunctionAndTypeManager functionAndTypeManager)
{
return split(statisticAggregations, variableAllocator, functionAndTypeManager, false);
}

public static StatisticAggregations.Parts splitIntoPartialAndIntermediate(StatisticAggregations statisticAggregations, VariableAllocator variableAllocator, FunctionAndTypeManager functionAndTypeManager)
{
return split(statisticAggregations, variableAllocator, functionAndTypeManager, true);
}

private static StatisticAggregations.Parts split(StatisticAggregations statisticAggregations, VariableAllocator variableAllocator, FunctionAndTypeManager functionAndTypeManager, boolean intermediate)
{
ImmutableMap.Builder<VariableReferenceExpression, AggregationNode.Aggregation> finalOrIntermediateAggregations = ImmutableMap.builder();
ImmutableMap.Builder<VariableReferenceExpression, AggregationNode.Aggregation> partialAggregations = ImmutableMap.builder();
for (Map.Entry<VariableReferenceExpression, AggregationNode.Aggregation> entry : statisticAggregations.getAggregations().entrySet()) {
AggregationNode.Aggregation originalAggregation = entry.getValue();
FunctionHandle functionHandle = originalAggregation.getFunctionHandle();
AggregationFunctionImplementation function = functionAndTypeManager.getAggregateFunctionImplementation(functionHandle);

// create partial aggregation
VariableReferenceExpression partialVariable = variableAllocator.newVariable(entry.getValue().getCall().getSourceLocation(), functionAndTypeManager.getFunctionMetadata(functionHandle).getName().getObjectName(), function.getIntermediateType());
partialAggregations.put(partialVariable, new AggregationNode.Aggregation(
new CallExpression(
originalAggregation.getCall().getSourceLocation(),
originalAggregation.getCall().getDisplayName(),
functionHandle,
function.getIntermediateType(),
originalAggregation.getArguments()),
originalAggregation.getFilter(),
originalAggregation.getOrderBy(),
originalAggregation.isDistinct(),
originalAggregation.getMask()));

// create final aggregation
finalOrIntermediateAggregations.put(entry.getKey(),
new AggregationNode.Aggregation(
new CallExpression(
originalAggregation.getCall().getSourceLocation(),
originalAggregation.getCall().getDisplayName(),
functionHandle,
intermediate ? function.getIntermediateType() : function.getFinalType(),
ImmutableList.of(partialVariable)),
Optional.empty(),
Optional.empty(),
false,
Optional.empty()));
}

StatisticAggregations finalOrIntermediateAggregation = new StatisticAggregations(finalOrIntermediateAggregations.build(), statisticAggregations.getGroupingVariables());
return new StatisticAggregations.Parts(
intermediate ? Optional.empty() : Optional.of(finalOrIntermediateAggregation),
intermediate ? Optional.of(finalOrIntermediateAggregation) : Optional.empty(),
new StatisticAggregations(partialAggregations.build(), statisticAggregations.getGroupingVariables()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.StageExecutionDescriptor;
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.plan.ValuesNode;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
Expand All @@ -45,8 +47,6 @@
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
import com.facebook.presto.sql.planner.plan.TableFinishNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.facebook.presto.sql.planner.sanity.PlanChecker;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SemiJoinNode;
import com.facebook.presto.spi.plan.SortNode;
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.spi.plan.ValuesNode;
Expand All @@ -59,8 +61,6 @@
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.TableFinishNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.facebook.presto.sql.planner.plan.TopNRowNumberNode;
import com.facebook.presto.sql.planner.plan.UnnestNode;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.plan.WindowNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.facebook.presto.sql.planner.plan.TopNRowNumberNode;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import com.facebook.presto.spi.plan.AggregationNode.Aggregation;
import com.facebook.presto.spi.plan.AggregationNode.Step;
import com.facebook.presto.spi.plan.Assignments;
import com.facebook.presto.spi.plan.DeleteNode;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.FilterNode;
Expand All @@ -163,7 +164,10 @@
import com.facebook.presto.spi.plan.SortNode;
import com.facebook.presto.spi.plan.SpatialJoinNode;
import com.facebook.presto.spi.plan.StageExecutionDescriptor;
import com.facebook.presto.spi.plan.StatisticAggregationsDescriptor;
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.spi.plan.ValuesNode;
Expand Down Expand Up @@ -192,7 +196,6 @@
import com.facebook.presto.sql.gen.PageFunctionCompiler;
import com.facebook.presto.sql.planner.optimizations.IndexJoinOptimizer;
import com.facebook.presto.sql.planner.plan.AssignUniqueId;
import com.facebook.presto.sql.planner.plan.DeleteNode;
import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode;
Expand All @@ -204,11 +207,8 @@
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SampleNode;
import com.facebook.presto.sql.planner.plan.StatisticAggregationsDescriptor;
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
import com.facebook.presto.sql.planner.plan.TableFinishNode;
import com.facebook.presto.sql.planner.plan.TableWriterMergeNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.facebook.presto.sql.planner.plan.TopNRowNumberNode;
import com.facebook.presto.sql.planner.plan.UnnestNode;
import com.facebook.presto.sql.planner.plan.UpdateNode;
Expand Down
Loading

0 comments on commit 0df90af

Please sign in to comment.