Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce pushdownFilter SPI #12875

Merged
merged 2 commits into from
Jun 5, 2019
Merged

Conversation

mbasmanova
Copy link
Contributor

Introduce pushdownFilter metadata API to enable more efficient filter processing for ORC data inside of the Hive connector. Also, introduce isPushdownFilterSupported API to allow connectors to opt-in into the new functionality. By default, isPushdownFilterSupported returns false and pushdownFilter is not invoked. When isPushdownFilterSupported returns true, the engine invokes pushdownFilter instead of getTableLayouts API. The connector encodes the pushed down filter into the ConnectorTableLayoutHandle.

This PR introduces the new APIs, updates PickTableLayouts optimizer rule to use them and provides a minimal implementation for the Hive connector.

The implementation in Hive connector covers primarily the optimizer part and there is no logic to evaluate the pushed down filter during the execution yet. The information about the filter is propagated via HiveSplit and this PR includes the necessary plumbing. The original code path is modified slightly to compact range filters during the execution instead of planning. This is done to reduce the discrepancy between two code paths in the planning phase.

Aside from PickTableLayouts rule, there are a number of places that invoke getTableLayout API. These will be updated in subsequent PRs.

Copy link
Contributor

@highker highker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high-level comments on "Introduce pushdownFilter SPI"

@@ -94,6 +95,10 @@
*/
TableHandle getAlternativeTableHandle(Session session, TableHandle tableHandle, PartitioningHandle partitioningHandle);

boolean isPushdownFilterSupported(Session session, TableHandle tableHandle);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When filter node and scan node get moved to SPI, this interface can be replaced with a connector optimizer rule (by moving the predicate in filter node to table scan's table layout). So shall we add a javadoc to these two interfaces to indicate the interfaces will be removed once we have plan node in SPI (with a ticket number 12546) maybe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for pushdownFilter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan will work on an Experimental annotation for future use like this. But I'm totally fine having this English "Experimental" in the javadoc for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add it now if we merge #12904 :D

@@ -239,6 +273,66 @@ public Result apply(TableScanNode tableScanNode, Captures captures, Context cont
}
}

private static final class TranslateVariableNamesVisitor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually @hellium01 has a nice util to do the same thing (21af132) with test coverage. Could you check that out? I have asked him to separate that out as a single PR (#12881)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might change in the near future as @rongrong is replacing all Symbol with VariableExpression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have TypeProvider here, so you can actually construct a proper VariableReferenceExpressionInliner It's actually not safe to use the reference variable's type as the inlined variable's type (I've seen test failures related to this). So the proper way to do it is to use TypeProvider to construct VariableReferenceExpression and use the variable mapping as context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you actually need is something like RowExpressionNodeInliner to replace VariableReferenceExpression with ColumnReferenceExpression back/forth?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rongrong @highker @hellium01 Yi, I found RowExpressionNodeInliner. It allows one to replace one RowExpression with another. I could use it to replace VariableReferenceExpression with VariableReferenceExpression, but I don't see ColumnReferenceExpression. Are you suggesting to introduce one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we already introduced ColumnReferenceExpression that uses ColumnHandle. If not, can you add support for that? It is going to be very useful for pushdown cases.

/**
* Experimental: if true, the engine will invoke pushdownFilter instead of getTableLayouts.
*/
default boolean isPushdownFilterSupported(ConnectorSession session, ConnectorTableHandle tableHandle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment saying that it will be replaced once FilterNode is in SPI (#12546)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: Why this SPI needs to have tableHandle?

  • For now it returns true for partitioned table, and false for unpartitioned table. However the predicate being pushed down might not include partition columns. So this check doesn't look to me to be necessary?
  • In the future we can pushdown complex type extractions, so any table with complex type is candidate for pushdown filter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wenleix In this iteration of Aria, filter pushdown is supported only for tables stored using ORC or DWRF formats. In the future, we hope to add support for Parquet. Technically speaking, I could move that check into the pushdownFilter method itself, but it seems cleaner and safer to have it here.

@highker highker self-requested a review May 30, 2019 19:52
Copy link
Contributor

@highker highker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished "Introduce pushdownFilter SPI".

Can we have test coverage on this change? Rather than putting the test in TestPickTableLayout (which only aims to test a single rule), can we have the test in TestLogicalPlanner or any class inheriting BasePlanTest? For such case, we can run the entire planner.

The reason to have it tests against the entire planner than a single rule is due to the future refactoring of moving the logic of filter pushdown from PickTableLayout to ConnectorOptimizer. So that we can still have a valid test coverage.

return Result.empty();
}

Session session = context.getSession();
if (metadata.isPushdownFilterSupported(session, tableHandle)) {
PushdownFilterResult pushdownFilterResult = metadata.pushdownFilter(session, tableHandle, TRUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we push TRUE into table scan, we are not expecting taking back any unenforced constraint of isNone() case right? Or I might have missed something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it seems like should be the case.

Copy link
Contributor Author

@mbasmanova mbasmanova Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@highker @hellium01 This is existing logic. If the table is empty and the connector has a way to detect this with certainty, it will return none. Here is the corresponding snippet of code for the original code path.

            TableLayoutResult layout = metadata.getLayout(
                    session,
                    tableHandle,
                    Constraint.alwaysTrue(),
                    Optional.of(tableScanNode.getOutputSymbols().stream()
                            .map(tableScanNode.getAssignments()::get)
                            .collect(toImmutableSet())));

            if (layout.getLayout().getPredicate().isNone()) {
                return Result.ofPlanNode(new ValuesNode(context.getIdAllocator().getNextId(), tableScanNode.getOutputSymbols(), ImmutableList.of()));
            }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong, but looks to me line 234-245 is sort of doing the same thing as line 248-266, but using the pushdownFilter with TRUE instead of using getLayout with Constraint.alwaysTrue().

I am curious if we can just use the original logic form line 248-266? Since my understanding is metadata .getLayout is a more universal method and implemented for every connector. And pushdownFilter method is more suitable for pushPredicateIntoTableScan.

Or, is the idea here that for a connector supports pushdownFilter, this pushdownFilter supersedes getLayout for consistency purpose?

.collect(toImmutableBiMap(
entry -> entry.getKey().getName(),
entry -> getColumnName(session, metadata, node.getTable(), entry.getValue())));
RowExpression translatedPredicate = SqlToRowExpressionTranslator.translate(predicate, predicateTypes, ImmutableMap.of(), metadata.getFunctionManager(), metadata.getTypeManager(), session, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see if this can be altered based on @rongrong's comment (79b8320#r289458788).

@@ -239,6 +273,66 @@ public Result apply(TableScanNode tableScanNode, Captures captures, Context cont
}
}

private static final class TranslateVariableNamesVisitor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you actually need is something like RowExpressionNodeInliner to replace VariableReferenceExpression with ColumnReferenceExpression back/forth?

return Result.empty();
}

Session session = context.getSession();
if (metadata.isPushdownFilterSupported(session, tableHandle)) {
PushdownFilterResult pushdownFilterResult = metadata.pushdownFilter(session, tableHandle, TRUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it seems like should be the case.

RowExpression translatedPredicate = SqlToRowExpressionTranslator.translate(predicate, predicateTypes, ImmutableMap.of(), metadata.getFunctionManager(), metadata.getTypeManager(), session, false)
.accept(new TranslateVariableNamesVisitor(), symbolToColumnNameMap);

PushdownFilterResult pushdownFilterResult = metadata.pushdownFilter(session, node.getTable(), translatedPredicate);
Copy link
Contributor

@hellium01 hellium01 May 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't gone into HiveConnector's code but it looks like it will replace the old pickTableLayouts. There is a few things likely needs to be careful. One thing is that there is a few hacks in push down predicate on partition keys to do some evaluation on complicated predicate and not return huge tupleDomain. That's why constraint is passed in with a isCandidate. Otherwise, I am all for it to have a simplified interface.

Perhaps, that is something who has deep knowledge on HiveConnector can chime in.

Copy link
Contributor

@hellium01 hellium01 May 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another place might need to change is MetadataQueryOptimizer. Otherwise, we will have to keep both methods in HiveConnector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hellium01 Yi, indeed, there are a number of places that invoke Metadata.getLayout and all of these will be need to be modified to use pushdownFilter as well. One option would be to fold filterPushdown into Metadata.getLayout. I plan on working through this in a separate PR, because I feel this PR is pretty big already. I would also like to set things up so that multiple people could work in parallel, e.g. once this PR lands, one person can work on the above while another one can work on subfield pruning and/or implementing filter evaluation in Hive connector.

Re: isCandidate - My understanding is that the new code path (via pushdownFilter) is missing out on some opportunities to prune partitions using deterministic expressions, e.g. SELECT * FROM t WHERE length(p) > 10. That's because, HiveMetadata#pushdownFilter doesn't specify any predicate when constructing Constraint object. How about I add a TODO to add logic to specify the predicate?

        HivePartitionResult hivePartitionResult = partitionManager.getPartitions(metastore, tableHandle, new Constraint<>(entireColumnDomain), session);

@highker highker self-requested a review May 31, 2019 21:07
Copy link
Contributor

@highker highker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a minor comment for "Implement pushdownFilter in Hive connector".

I didn't quite dive into the logic of pushdownFilter though I know the idea from a high-level point. Test cases for that function could be helpful + Aria folks to verify the logic of pushdownFilter. Otherwise LGTM

@@ -1723,6 +1740,90 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa
return true;
}

@Override
public boolean isPushdownFilterSupported(ConnectorSession session, ConnectorTableHandle tableHandle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a unit tests or integration tests for this code path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@highker This path is not fully implemented yet. Only the planner portion is available. Hence, an end-to-end queries don't work yet.

@mbasmanova
Copy link
Contributor Author

@highker

Can we have test coverage on this change? Rather than putting the test in TestPickTableLayout (which only aims to test a single rule), can we have the test in TestLogicalPlanner or any class inheriting BasePlanTest? For such case, we can run the entire planner.

It would be quite helpful to have a planner test for filter pushdown. The existing testing frameworks appear to be insufficient though. TestLogicalPlanner only works with TPCH connector which doesn't support filter pushdown. In particular, plan tests rely on LocalQueryRunner#createPlan method that returns a Plan object. All of Hive tests use DistributedQueryRunner which doesn't have createPlan method. DistributedQueryRunner can only be used to fetch a textual representation of the plan by running EXPLAIN queries. I'm going take a look to see if there is a way to implement createPlan that returns Plan object with (Hive)DistributedQueryRunner. In the meantime, if you have some other ideas about how to write such tests, let me know.

@mbasmanova
Copy link
Contributor Author

mbasmanova commented Jun 3, 2019

@highker I figured that I can add assertPlan method to AbstractTestQueryFramework and that will allow me to write planner tests using Hive connector (similar to TestLogicalPlanner). I'm going to add such tests and update the PR.

    public void assertPlan(String query, PlanMatchPattern pattern)
    {
        assertPlan(queryRunner.getDefaultSession(), query, pattern);
    }

    public void assertPlan(Session session, String query, PlanMatchPattern pattern)
    {
        QueryExplainer explainer = getQueryExplainer();
        transaction(queryRunner.getTransactionManager(), queryRunner.getAccessControl())
                .singleStatement(
                .execute(session, transactionSession -> {
                    Plan actualPlan = explainer.getLogicalPlan(transactionSession, sqlParser.createStatement(query, createParsingOptions(transactionSession)), emptyList(), WarningCollector.NOOP);
                    PlanAssert.assertPlan(transactionSession, queryRunner.getMetadata(), queryRunner.getStatsCalculator(), actualPlan, pattern);
                    return null;
                });
    }

@wenleix
Copy link
Contributor

wenleix commented Jun 3, 2019

This PR is bundled with #12729 to support filter pushdown, and will be superseded once Connector can participate in planning, (see #12546 for details) right?

@mbasmanova
Copy link
Contributor Author

@wenleix The SPI changes in the PR will be superseded by the Connector can participate in planning things eventually.

@mbasmanova mbasmanova force-pushed the pushdown-filter branch 2 times, most recently from de4ead5 to 3e74ceb Compare June 3, 2019 21:24
@mbasmanova
Copy link
Contributor Author

@highker @hellium01 James, Yi, thank you for reviewing this PR. I made the following changes to address your feedback:

  • Added TestHiveLogicPlanner test to verify planner part of the pushdown with Hive connector.
  • Replaced TranslateVariableNamesVisitor with RowExpressionNodeInliner#replaceExpression
  • Added a TODO about pruning partitions using on non-tuple-domain predicates

It feel like adding ColumnReferenceExpression is not strictly necessary for this PR. It is also not clear how exactly it will be used when connector will be working with the entire plan. Hence, I'd rather not include this in this PR.

Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Introduce pushdownFilter SPI" Initial questions.

public class PushdownFilterResult
{
private final TableLayout layout;
private final RowExpression unenforcedConstraint;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What about unenforcedFilter since this is now served for pushdown filter purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wenleix Sure. I'll rename it.

return Result.empty();
}

Session session = context.getSession();
if (metadata.isPushdownFilterSupported(session, tableHandle)) {
PushdownFilterResult pushdownFilterResult = metadata.pushdownFilter(session, tableHandle, TRUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong, but looks to me line 234-245 is sort of doing the same thing as line 248-266, but using the pushdownFilter with TRUE instead of using getLayout with Constraint.alwaysTrue().

I am curious if we can just use the original logic form line 248-266? Since my understanding is metadata .getLayout is a more universal method and implemented for every connector. And pushdownFilter method is more suitable for pushPredicateIntoTableScan.

Or, is the idea here that for a connector supports pushdownFilter, this pushdownFilter supersedes getLayout for consistency purpose?

WarningCollector.NOOP,
false);

BiMap<VariableReferenceExpression, VariableReferenceExpression> symbolToColumnMapping = node.getAssignments().entrySet().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: Why do we need BiMap here? Looks like RowExpressionNodeInliner.replaceExpression requires an Map as last parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wenleix The inverted mapping is used to convert unenforcedConstraint into an expression that is used to create a FilterNode on top of the TableScanNode. The call to .inverse() got lost in the last round of changes and the test didn't catch it because Hive connector never returns unenforcedConstraint that's not TRUE. I added the missing call.

            RowExpression unenforcedConstraint = pushdownFilterResult.getUnenforcedConstraint();
            if (!TRUE.equals(unenforcedConstraint)) {
                return new FilterNode(idAllocator.getNextId(), tableScan, replaceExpression(unenforcedConstraint, symbolToColumnMapping.inverse()));
            }

@highker highker self-requested a review June 3, 2019 23:38
@mbasmanova
Copy link
Contributor Author

@wenleix

Or, is the idea here that for a connector supports pushdownFilter, this pushdownFilter supersedes getLayout for consistency purpose?

That's the plan: have the engine use either pushdownFilter or getTableLayouts consistently based on isPushdownFilterSupported. In this PR I replaced getTableLayouts with pushdownFilter in PickTableLayout. Subsequent PRs will be replacing getTableLayouts calls in other places.

To clarify, even when filter = TRUE, getTableLayouts cannot be used in place of pushdownFilter because it doesn't provide the "current" state (ConnectorTableLayoutHandle) and therefore the connector cannot know if a filter was pushed down before.

@mbasmanova mbasmanova force-pushed the pushdown-filter branch 3 times, most recently from cc1ffd5 to 09c37bd Compare June 4, 2019 20:36
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement pushdownFilter in Hive connector LGTM given the discussions going on


private boolean isPushdownFilterEnabled(ConnectorSession session, ConnectorTableHandle tableHandle)
{
boolean pushdownFilterEnabled = HiveSessionProperties.isPushdownFilterEnabled(session);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static import

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I can do that. It will clash with this method.

@@ -102,7 +107,9 @@ public HiveSplit(
this.readBucketNumber = readBucketNumber;
this.tableBucketNumber = tableBucketNumber;
this.forceLocalScheduling = forceLocalScheduling;
this.effectivePredicate = effectivePredicate;
this.domainPredicate = requireNonNull(domainPredicate, "domainPredicate is null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep the current convention and put the checks at the beginning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Copy link
Contributor

@highker highker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Introduce pushdownFilter SPI" LGTM

@@ -94,6 +95,10 @@
*/
TableHandle getAlternativeTableHandle(Session session, TableHandle tableHandle, PartitioningHandle partitioningHandle);

boolean isPushdownFilterSupported(Session session, TableHandle tableHandle);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan will work on an Experimental annotation for future use like this. But I'm totally fine having this English "Experimental" in the javadoc for now.

@mbasmanova
Copy link
Contributor Author

@tdcmeehan @highker Added the cool new @Experimenal annotation to methods in ConnectorMetadata.

@mbasmanova mbasmanova merged commit 76ea27e into prestodb:master Jun 5, 2019
@mbasmanova mbasmanova deleted the pushdown-filter branch June 5, 2019 15:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants