Skip to content

Commit

Permalink
Add dynamic filter canonicalization in UnaliasSymbolReferences
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 authored and highker committed Aug 23, 2020
1 parent 0cfa23f commit ad31ac0
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,8 @@ public PlanNode visitJoin(JoinNode node, RewriteContext<Void> context)
Optional<VariableReferenceExpression> canonicalLeftHashVariable = canonicalize(node.getLeftHashVariable());
Optional<VariableReferenceExpression> canonicalRightHashVariable = canonicalize(node.getRightHashVariable());

Map<String, VariableReferenceExpression> canonicalDynamicFilters = canonicalizeAndDistinct(node.getDynamicFilters());

if (node.getType().equals(INNER)) {
canonicalCriteria.stream()
.filter(clause -> clause.getLeft().getType().equals(clause.getRight().getType()))
Expand All @@ -545,7 +547,7 @@ public PlanNode visitJoin(JoinNode node, RewriteContext<Void> context)
canonicalLeftHashVariable,
canonicalRightHashVariable,
node.getDistributionType(),
node.getDynamicFilters());
canonicalDynamicFilters);
}

@Override
Expand Down Expand Up @@ -764,6 +766,19 @@ private List<VariableReferenceExpression> canonicalizeAndDistinct(List<VariableR
return builder.build();
}

private Map<String, VariableReferenceExpression> canonicalizeAndDistinct(Map<String, VariableReferenceExpression> dynamicFilters)
{
Set<VariableReferenceExpression> added = new HashSet<>();
ImmutableMap.Builder<String, VariableReferenceExpression> builder = ImmutableMap.builder();
for (Map.Entry<String, VariableReferenceExpression> entry : dynamicFilters.entrySet()) {
VariableReferenceExpression canonical = canonicalize(entry.getValue());
if (added.add(canonical)) {
builder.put(entry.getKey(), canonical);
}
}
return builder.build();
}

private WindowNode.Specification canonicalizeAndDistinct(WindowNode.Specification specification)
{
return new WindowNode.Specification(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import javax.annotation.concurrent.Immutable;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -124,7 +125,11 @@ public JoinNode(@JsonProperty("id") PlanNodeId id,
}

for (VariableReferenceExpression variableReferenceExpression : dynamicFilters.values()) {
checkArgument(right.getOutputVariables().contains(variableReferenceExpression), "Right join input doesn't contain symbol for dynamic filter: %s", variableReferenceExpression);
checkArgument(right.getOutputVariables().contains(variableReferenceExpression), format(
"Right join input doesn't contain symbol for dynamic filter: %s, rightVariables: %s, dynamicFilters.values(): %s",
variableReferenceExpression,
Arrays.toString(right.getOutputVariables().toArray()),
Arrays.toString(dynamicFilters.values().toArray())));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand All @@ -41,6 +42,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Sets.difference;
import static com.google.common.collect.Sets.intersection;
import static java.lang.String.format;

/**
* When dynamic filter assignments are present on a Join node, they should be consumed by a Filter node on its probe side
Expand Down Expand Up @@ -99,7 +101,12 @@ private Set<String> extractUnmatchedDynamicFilters(AbstractJoinNode node, Void c

Set<String> consumedBuildSide = node.getBuild().accept(this, context);
Set<String> unconsumedByBuildSide = intersection(currentJoinDynamicFilters, consumedBuildSide);
verify(unconsumedByBuildSide.isEmpty(), "Dynamic filters %s present in join were consumed by its build side.", unconsumedByBuildSide);
verify(unconsumedByBuildSide.isEmpty(),
format(
"Dynamic filters %s present in join were consumed by its build side. consumedBuildSide %s, currentJoinDynamicFilters %s",
Arrays.toString(unconsumedByBuildSide.toArray()),
Arrays.toString(consumedBuildSide.toArray()),
Arrays.toString(currentJoinDynamicFilters.toArray())));

Set<String> unmatched = new HashSet<>(consumedBuildSide);
unmatched.addAll(consumedProbeSide);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testUnconsumedDynamicFilterInJoin()
validatePlan(root);
}

@Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "Dynamic filters \\[DF\\] present in join were consumed by its build side.")
@Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "Dynamic filters \\[DF\\] present in join were consumed by its build side. consumedBuildSide \\[DF\\], currentJoinDynamicFilters \\[DF\\]")
public void testDynamicFilterConsumedOnBuildSide()
{
PlanNode root = builder.join(
Expand Down

0 comments on commit ad31ac0

Please sign in to comment.