-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce pushdownFilter SPI #12875
Introduce pushdownFilter SPI #12875
Conversation
bc52bf4
to
9774767
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
high-level comments on "Introduce pushdownFilter SPI"
@@ -94,6 +95,10 @@ | |||
*/ | |||
TableHandle getAlternativeTableHandle(Session session, TableHandle tableHandle, PartitioningHandle partitioningHandle); | |||
|
|||
boolean isPushdownFilterSupported(Session session, TableHandle tableHandle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When filter node and scan node get moved to SPI, this interface can be replaced with a connector optimizer rule (by moving the predicate in filter node to table scan's table layout). So shall we add a javadoc to these two interfaces to indicate the interfaces will be removed once we have plan node in SPI (with a ticket number 12546) maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for pushdownFilter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdcmeehan will work on an Experimental
annotation for future use like this. But I'm totally fine having this English "Experimental" in the javadoc for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add it now if we merge #12904 :D
@@ -239,6 +273,66 @@ public Result apply(TableScanNode tableScanNode, Captures captures, Context cont | |||
} | |||
} | |||
|
|||
private static final class TranslateVariableNamesVisitor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually @hellium01 has a nice util to do the same thing (21af132) with test coverage. Could you check that out? I have asked him to separate that out as a single PR (#12881)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might change in the near future as @rongrong is replacing all Symbol
with VariableExpression
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have TypeProvider
here, so you can actually construct a proper VariableReferenceExpressionInliner
It's actually not safe to use the reference variable's type as the inlined variable's type (I've seen test failures related to this). So the proper way to do it is to use TypeProvider
to construct VariableReferenceExpression
and use the variable mapping as context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what you actually need is something like RowExpressionNodeInliner
to replace VariableReferenceExpression
with ColumnReferenceExpression
back/forth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rongrong @highker @hellium01 Yi, I found RowExpressionNodeInliner
. It allows one to replace one RowExpression with another. I could use it to replace VariableReferenceExpression with VariableReferenceExpression, but I don't see ColumnReferenceExpression. Are you suggesting to introduce one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we already introduced ColumnReferenceExpression
that uses ColumnHandle. If not, can you add support for that? It is going to be very useful for pushdown cases.
/** | ||
* Experimental: if true, the engine will invoke pushdownFilter instead of getTableLayouts. | ||
*/ | ||
default boolean isPushdownFilterSupported(ConnectorSession session, ConnectorTableHandle tableHandle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment saying that it will be replaced once FilterNode
is in SPI (#12546)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious: Why this SPI needs to have tableHandle
?
- For now it returns true for partitioned table, and false for unpartitioned table. However the predicate being pushed down might not include partition columns. So this check doesn't look to me to be necessary?
- In the future we can pushdown complex type extractions, so any table with complex type is candidate for pushdown filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wenleix In this iteration of Aria, filter pushdown is supported only for tables stored using ORC or DWRF formats. In the future, we hope to add support for Parquet. Technically speaking, I could move that check into the pushdownFilter method itself, but it seems cleaner and safer to have it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished "Introduce pushdownFilter SPI".
Can we have test coverage on this change? Rather than putting the test in TestPickTableLayout
(which only aims to test a single rule), can we have the test in TestLogicalPlanner
or any class inheriting BasePlanTest
? For such case, we can run the entire planner.
The reason to have it tests against the entire planner than a single rule is due to the future refactoring of moving the logic of filter pushdown from PickTableLayout
to ConnectorOptimizer
. So that we can still have a valid test coverage.
return Result.empty(); | ||
} | ||
|
||
Session session = context.getSession(); | ||
if (metadata.isPushdownFilterSupported(session, tableHandle)) { | ||
PushdownFilterResult pushdownFilterResult = metadata.pushdownFilter(session, tableHandle, TRUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we push TRUE
into table scan, we are not expecting taking back any unenforced constraint of isNone()
case right? Or I might have missed something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it seems like should be the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@highker @hellium01 This is existing logic. If the table is empty and the connector has a way to detect this with certainty, it will return none
. Here is the corresponding snippet of code for the original code path.
TableLayoutResult layout = metadata.getLayout(
session,
tableHandle,
Constraint.alwaysTrue(),
Optional.of(tableScanNode.getOutputSymbols().stream()
.map(tableScanNode.getAssignments()::get)
.collect(toImmutableSet())));
if (layout.getLayout().getPredicate().isNone()) {
return Result.ofPlanNode(new ValuesNode(context.getIdAllocator().getNextId(), tableScanNode.getOutputSymbols(), ImmutableList.of()));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong, but looks to me line 234-245 is sort of doing the same thing as line 248-266, but using the pushdownFilter
with TRUE
instead of using getLayout
with Constraint.alwaysTrue()
.
I am curious if we can just use the original logic form line 248-266? Since my understanding is metadata .getLayout
is a more universal method and implemented for every connector. And pushdownFilter
method is more suitable for pushPredicateIntoTableScan
.
Or, is the idea here that for a connector supports pushdownFilter
, this pushdownFilter
supersedes getLayout
for consistency purpose?
.collect(toImmutableBiMap( | ||
entry -> entry.getKey().getName(), | ||
entry -> getColumnName(session, metadata, node.getTable(), entry.getValue()))); | ||
RowExpression translatedPredicate = SqlToRowExpressionTranslator.translate(predicate, predicateTypes, ImmutableMap.of(), metadata.getFunctionManager(), metadata.getTypeManager(), session, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's see if this can be altered based on @rongrong's comment (79b8320#r289458788).
@@ -239,6 +273,66 @@ public Result apply(TableScanNode tableScanNode, Captures captures, Context cont | |||
} | |||
} | |||
|
|||
private static final class TranslateVariableNamesVisitor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what you actually need is something like RowExpressionNodeInliner
to replace VariableReferenceExpression
with ColumnReferenceExpression
back/forth?
return Result.empty(); | ||
} | ||
|
||
Session session = context.getSession(); | ||
if (metadata.isPushdownFilterSupported(session, tableHandle)) { | ||
PushdownFilterResult pushdownFilterResult = metadata.pushdownFilter(session, tableHandle, TRUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it seems like should be the case.
RowExpression translatedPredicate = SqlToRowExpressionTranslator.translate(predicate, predicateTypes, ImmutableMap.of(), metadata.getFunctionManager(), metadata.getTypeManager(), session, false) | ||
.accept(new TranslateVariableNamesVisitor(), symbolToColumnNameMap); | ||
|
||
PushdownFilterResult pushdownFilterResult = metadata.pushdownFilter(session, node.getTable(), translatedPredicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't gone into HiveConnector's code but it looks like it will replace the old pickTableLayouts
. There is a few things likely needs to be careful. One thing is that there is a few hacks in push down predicate on partition keys to do some evaluation on complicated predicate and not return huge tupleDomain. That's why constraint is passed in with a isCandidate
. Otherwise, I am all for it to have a simplified interface.
Perhaps, that is something who has deep knowledge on HiveConnector
can chime in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another place might need to change is MetadataQueryOptimizer
. Otherwise, we will have to keep both methods in HiveConnector
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hellium01 Yi, indeed, there are a number of places that invoke Metadata.getLayout and all of these will be need to be modified to use pushdownFilter as well. One option would be to fold filterPushdown into Metadata.getLayout. I plan on working through this in a separate PR, because I feel this PR is pretty big already. I would also like to set things up so that multiple people could work in parallel, e.g. once this PR lands, one person can work on the above while another one can work on subfield pruning and/or implementing filter evaluation in Hive connector.
Re: isCandidate - My understanding is that the new code path (via pushdownFilter) is missing out on some opportunities to prune partitions using deterministic expressions, e.g. SELECT * FROM t WHERE length(p) > 10
. That's because, HiveMetadata#pushdownFilter
doesn't specify any predicate when constructing Constraint object. How about I add a TODO to add logic to specify the predicate?
HivePartitionResult hivePartitionResult = partitionManager.getPartitions(metastore, tableHandle, new Constraint<>(entireColumnDomain), session);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only a minor comment for "Implement pushdownFilter in Hive connector".
I didn't quite dive into the logic of pushdownFilter
though I know the idea from a high-level point. Test cases for that function could be helpful + Aria folks to verify the logic of pushdownFilter
. Otherwise LGTM
@@ -1723,6 +1740,90 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa | |||
return true; | |||
} | |||
|
|||
@Override | |||
public boolean isPushdownFilterSupported(ConnectorSession session, ConnectorTableHandle tableHandle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a unit tests or integration tests for this code path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@highker This path is not fully implemented yet. Only the planner portion is available. Hence, an end-to-end queries don't work yet.
It would be quite helpful to have a planner test for filter pushdown. The existing testing frameworks appear to be insufficient though. TestLogicalPlanner only works with TPCH connector which doesn't support filter pushdown. In particular, plan tests rely on LocalQueryRunner#createPlan method that returns a Plan object. All of Hive tests use DistributedQueryRunner which doesn't have createPlan method. DistributedQueryRunner can only be used to fetch a textual representation of the plan by running EXPLAIN queries. I'm going take a look to see if there is a way to implement createPlan that returns Plan object with (Hive)DistributedQueryRunner. In the meantime, if you have some other ideas about how to write such tests, let me know. |
@highker I figured that I can add
|
@wenleix The SPI changes in the PR will be superseded by the |
de4ead5
to
3e74ceb
Compare
@highker @hellium01 James, Yi, thank you for reviewing this PR. I made the following changes to address your feedback:
It feel like adding ColumnReferenceExpression is not strictly necessary for this PR. It is also not clear how exactly it will be used when connector will be working with the entire plan. Hence, I'd rather not include this in this PR. |
3e74ceb
to
6b76c91
Compare
presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Introduce pushdownFilter SPI" Initial questions.
public class PushdownFilterResult | ||
{ | ||
private final TableLayout layout; | ||
private final RowExpression unenforcedConstraint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: What about unenforcedFilter
since this is now served for pushdown filter purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wenleix Sure. I'll rename it.
return Result.empty(); | ||
} | ||
|
||
Session session = context.getSession(); | ||
if (metadata.isPushdownFilterSupported(session, tableHandle)) { | ||
PushdownFilterResult pushdownFilterResult = metadata.pushdownFilter(session, tableHandle, TRUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong, but looks to me line 234-245 is sort of doing the same thing as line 248-266, but using the pushdownFilter
with TRUE
instead of using getLayout
with Constraint.alwaysTrue()
.
I am curious if we can just use the original logic form line 248-266? Since my understanding is metadata .getLayout
is a more universal method and implemented for every connector. And pushdownFilter
method is more suitable for pushPredicateIntoTableScan
.
Or, is the idea here that for a connector supports pushdownFilter
, this pushdownFilter
supersedes getLayout
for consistency purpose?
WarningCollector.NOOP, | ||
false); | ||
|
||
BiMap<VariableReferenceExpression, VariableReferenceExpression> symbolToColumnMapping = node.getAssignments().entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious: Why do we need BiMap
here? Looks like RowExpressionNodeInliner.replaceExpression
requires an Map
as last parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wenleix The inverted mapping is used to convert unenforcedConstraint into an expression that is used to create a FilterNode on top of the TableScanNode. The call to .inverse()
got lost in the last round of changes and the test didn't catch it because Hive connector never returns unenforcedConstraint that's not TRUE. I added the missing call.
RowExpression unenforcedConstraint = pushdownFilterResult.getUnenforcedConstraint();
if (!TRUE.equals(unenforcedConstraint)) {
return new FilterNode(idAllocator.getNextId(), tableScan, replaceExpression(unenforcedConstraint, symbolToColumnMapping.inverse()));
}
6b76c91
to
64b8b8f
Compare
That's the plan: have the engine use either pushdownFilter or getTableLayouts consistently based on isPushdownFilterSupported. In this PR I replaced getTableLayouts with pushdownFilter in PickTableLayout. Subsequent PRs will be replacing getTableLayouts calls in other places. To clarify, even when filter = TRUE, getTableLayouts cannot be used in place of pushdownFilter because it doesn't provide the "current" state (ConnectorTableLayoutHandle) and therefore the connector cannot know if a filter was pushed down before. |
cc1ffd5
to
09c37bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement pushdownFilter in Hive connector
LGTM given the discussions going on
|
||
private boolean isPushdownFilterEnabled(ConnectorSession session, ConnectorTableHandle tableHandle) | ||
{ | ||
boolean pushdownFilterEnabled = HiveSessionProperties.isPushdownFilterEnabled(session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Static import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I can do that. It will clash with this method.
@@ -102,7 +107,9 @@ public HiveSplit( | |||
this.readBucketNumber = readBucketNumber; | |||
this.tableBucketNumber = tableBucketNumber; | |||
this.forceLocalScheduling = forceLocalScheduling; | |||
this.effectivePredicate = effectivePredicate; | |||
this.domainPredicate = requireNonNull(domainPredicate, "domainPredicate is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep the current convention and put the checks at the beginning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Introduce pushdownFilter SPI" LGTM
@@ -94,6 +95,10 @@ | |||
*/ | |||
TableHandle getAlternativeTableHandle(Session session, TableHandle tableHandle, PartitioningHandle partitioningHandle); | |||
|
|||
boolean isPushdownFilterSupported(Session session, TableHandle tableHandle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdcmeehan will work on an Experimental
annotation for future use like this. But I'm totally fine having this English "Experimental" in the javadoc for now.
09c37bd
to
cc8439c
Compare
@tdcmeehan @highker Added the cool new |
Introduce
pushdownFilter
metadata API to enable more efficient filter processing for ORC data inside of the Hive connector. Also, introduceisPushdownFilterSupported
API to allow connectors to opt-in into the new functionality. By default,isPushdownFilterSupported
returnsfalse
andpushdownFilter
is not invoked. WhenisPushdownFilterSupported
returns true, the engine invokespushdownFilter
instead ofgetTableLayouts
API. The connector encodes the pushed down filter into theConnectorTableLayoutHandle
.This PR introduces the new APIs, updates
PickTableLayouts
optimizer rule to use them and provides a minimal implementation for the Hive connector.The implementation in Hive connector covers primarily the optimizer part and there is no logic to evaluate the pushed down filter during the execution yet. The information about the filter is propagated via
HiveSplit
and this PR includes the necessary plumbing. The original code path is modified slightly to compact range filters during the execution instead of planning. This is done to reduce the discrepancy between two code paths in the planning phase.Aside from
PickTableLayouts
rule, there are a number of places that invokegetTableLayout
API. These will be updated in subsequent PRs.