Skip to content

Commit

Permalink
Rename TableWriterNode#partitioningScheme to tablePartitioningScheme
Browse files Browse the repository at this point in the history
  • Loading branch information
wenleix committed Feb 5, 2020
1 parent 5a7074b commit b4a186c
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2195,7 +2195,7 @@ public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanCont
public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context)
{
// Set table writer count
if (node.getPartitioningScheme().isPresent()) {
if (node.getTablePartitioningScheme().isPresent()) {
context.setDriverInstanceCount(getTaskPartitionedWriterCount(session));
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<FragmentProper
@Override
public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<FragmentProperties> context)
{
if (node.getPartitioningScheme().isPresent()) {
context.get().setDistribution(node.getPartitioningScheme().get().getPartitioning().getHandle(), metadata, session);
if (node.getTablePartitioningScheme().isPresent()) {
context.get().setDistribution(node.getTablePartitioningScheme().get().getPartitioning().getHandle(), metadata, session);
}
return context.defaultRewrite(node, context.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class PushTableWriteThroughUnion
// guaranteed regardless of this optimizer. The level of local parallelism will be
// determined by LocalExecutionPlanner separately, and shouldn't be a concern of
// this optimizer.
.matching(tableWriter -> !tableWriter.getPartitioningScheme().isPresent())
.matching(tableWriter -> !tableWriter.getTablePartitioningScheme().isPresent())
.with(source().matching(union().capturedAs(CHILD)));

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ public Result apply(TableWriterNode node, Captures captures, Context context)
node.getTableCommitContextVariable(),
node.getColumns(),
node.getColumnNames(),
node.getPartitioningScheme(),
node.getTablePartitioningScheme(),
rewrittenStatisticsAggregation));
}
return Result.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProper
{
PlanWithProperties source = node.getSource().accept(this, preferredProperties);

Optional<PartitioningScheme> partitioningScheme = node.getPartitioningScheme();
Optional<PartitioningScheme> partitioningScheme = node.getTablePartitioningScheme();
if (!partitioningScheme.isPresent()) {
if (scaleWriters) {
partitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), source.getNode().getOutputVariables()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,11 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, StreamPrefe
@Override
public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNode, StreamPreferredProperties parentPreferences)
{
if (originalTableWriterNode.getPartitioningScheme().isPresent() && getTaskPartitionedWriterCount(session) == 1) {
if (originalTableWriterNode.getTablePartitioningScheme().isPresent() && getTaskPartitionedWriterCount(session) == 1) {
return planAndEnforceChildren(originalTableWriterNode, singleStream(), defaultParallelism(session));
}

if (!originalTableWriterNode.getPartitioningScheme().isPresent() && getTaskWriterCount(session) == 1) {
if (!originalTableWriterNode.getTablePartitioningScheme().isPresent() && getTaskWriterCount(session) == 1) {
return planAndEnforceChildren(originalTableWriterNode, singleStream(), defaultParallelism(session));
}

Expand All @@ -506,7 +506,7 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo

PlanWithProperties tableWriter;

if (!originalTableWriterNode.getPartitioningScheme().isPresent()) {
if (!originalTableWriterNode.getTablePartitioningScheme().isPresent()) {
tableWriter = planAndEnforceChildren(
new TableWriterNode(
originalTableWriterNode.getId(),
Expand All @@ -517,7 +517,7 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
variableAllocator.newVariable("partialcontext", VARBINARY),
originalTableWriterNode.getColumns(),
originalTableWriterNode.getColumnNames(),
originalTableWriterNode.getPartitioningScheme(),
originalTableWriterNode.getTablePartitioningScheme(),
statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation)),
fixedParallelism(),
fixedParallelism());
Expand All @@ -529,7 +529,7 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
idAllocator.getNextId(),
LOCAL,
source.getNode(),
originalTableWriterNode.getPartitioningScheme().get()),
originalTableWriterNode.getTablePartitioningScheme().get()),
source.getProperties());
tableWriter = deriveProperties(
new TableWriterNode(
Expand All @@ -541,7 +541,7 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
variableAllocator.newVariable("partialcontext", VARBINARY),
originalTableWriterNode.getColumns(),
originalTableWriterNode.getColumnNames(),
originalTableWriterNode.getPartitioningScheme(),
originalTableWriterNode.getTablePartitioningScheme(),
statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation)),
exchange.getProperties());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,8 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<Set<Variab
{
ImmutableSet.Builder<VariableReferenceExpression> expectedInputs = ImmutableSet.<VariableReferenceExpression>builder()
.addAll(node.getColumns());
if (node.getPartitioningScheme().isPresent()) {
PartitioningScheme partitioningScheme = node.getPartitioningScheme().get();
if (node.getTablePartitioningScheme().isPresent()) {
PartitioningScheme partitioningScheme = node.getTablePartitioningScheme().get();
partitioningScheme.getPartitioning().getVariableReferences().forEach(expectedInputs::add);
partitioningScheme.getHashColumn().ifPresent(expectedInputs::add);
}
Expand All @@ -691,7 +691,7 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<Set<Variab
node.getTableCommitContextVariable(),
node.getColumns(),
node.getColumnNames(),
node.getPartitioningScheme(),
node.getTablePartitioningScheme(),
node.getStatisticsAggregation());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public TableWriterNode map(TableWriterNode node, PlanNode source, PlanNodeId new
map(node.getTableCommitContextVariable()),
columns,
node.getColumnNames(),
node.getPartitioningScheme().map(partitioningScheme -> canonicalize(partitioningScheme, source)),
node.getTablePartitioningScheme().map(partitioningScheme -> canonicalize(partitioningScheme, source)),
node.getStatisticsAggregation().map(this::map));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class TableWriterNode
private final VariableReferenceExpression tableCommitContextVariable;
private final List<VariableReferenceExpression> columns;
private final List<String> columnNames;
private final Optional<PartitioningScheme> partitioningScheme;
private final Optional<PartitioningScheme> tablePartitioningScheme;
private final Optional<StatisticAggregations> statisticsAggregation;
private final List<VariableReferenceExpression> outputs;

Expand All @@ -61,7 +61,7 @@ public TableWriterNode(
@JsonProperty("tableCommitContextVariable") VariableReferenceExpression tableCommitContextVariable,
@JsonProperty("columns") List<VariableReferenceExpression> columns,
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("partitioningScheme") Optional<PartitioningScheme> partitioningScheme,
@JsonProperty("partitioningScheme") Optional<PartitioningScheme> tablePartitioningScheme,
@JsonProperty("statisticsAggregation") Optional<StatisticAggregations> statisticsAggregation)
{
super(id);
Expand All @@ -77,7 +77,7 @@ public TableWriterNode(
this.tableCommitContextVariable = requireNonNull(tableCommitContextVariable, "tableCommitContextVariable is null");
this.columns = ImmutableList.copyOf(columns);
this.columnNames = ImmutableList.copyOf(columnNames);
this.partitioningScheme = requireNonNull(partitioningScheme, "partitioningScheme is null");
this.tablePartitioningScheme = requireNonNull(tablePartitioningScheme, "partitioningScheme is null");
this.statisticsAggregation = requireNonNull(statisticsAggregation, "statisticsAggregation is null");

ImmutableList.Builder<VariableReferenceExpression> outputs = ImmutableList.<VariableReferenceExpression>builder()
Expand Down Expand Up @@ -134,9 +134,9 @@ public List<String> getColumnNames()
}

@JsonProperty
public Optional<PartitioningScheme> getPartitioningScheme()
public Optional<PartitioningScheme> getTablePartitioningScheme()
{
return partitioningScheme;
return tablePartitioningScheme;
}

@JsonProperty
Expand Down Expand Up @@ -166,7 +166,7 @@ public <R, C> R accept(InternalPlanVisitor<R, C> visitor, C context)
@Override
public PlanNode replaceChildren(List<PlanNode> newChildren)
{
return new TableWriterNode(getId(), Iterables.getOnlyElement(newChildren), target, rowCountVariable, fragmentVariable, tableCommitContextVariable, columns, columnNames, partitioningScheme, statisticsAggregation);
return new TableWriterNode(getId(), Iterables.getOnlyElement(newChildren), target, rowCountVariable, fragmentVariable, tableCommitContextVariable, columns, columnNames, tablePartitioningScheme, statisticsAggregation);
}

// only used during planning -- will not be serialized
Expand Down

0 comments on commit b4a186c

Please sign in to comment.