Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local dynamic filter support for Iceberg #9538

Merged
merged 1 commit into from
Oct 14, 2021

Conversation

alexjo2144
Copy link
Member

@alexjo2144 alexjo2144 commented Oct 6, 2021

Issue: #4115
An original PR with the same changes: #5719

I think this should wait for #9193 since the tests are fairly flaky right now, relying on the left side of the join happening fast enough for the DF to be useful. Once that PR is merged we could add a DF wait time.

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % ensuring test determinism.

timeout);
}

protected void assertQueryStats(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider adding another method dedicated for operator stats assertions

protected void assertOperatorStats(
            Session session,
            @Language("SQL") String query,
            Predicate<PlanNode> planNodeMatcher,
            Consumer<List<OperatorStats>> operatorStatsAssertion,
            Consumer<MaterializedResult> resultAssertion,
            Duration timeout)
    {
        // TODO: replace this with a simple operator stats check once we find a way to wait until all pending updates to query stats have been applied
        // (might be fixed by https://github.com/trinodb/trino/issues/5172)
        assertEventually(timeout, () -> {
            DistributedQueryRunner queryRunner = getDistributedQueryRunner();
            ResultWithQueryId<MaterializedResult> resultWithQueryId = queryRunner.executeWithQueryId(session, query);
            QueryId queryId = resultWithQueryId.getQueryId();
            Plan plan = getDistributedQueryRunner().getQueryPlan(queryId);
            PlanNodeId nodeId = PlanNodeSearcher.searchFrom(plan.getRoot())
                    .where(planNodeMatcher)
                    .findOnlyElement()
                    .getId();
            List<OperatorStats> operatorStats = getDistributedQueryRunner().getCoordinator()
                    .getQueryManager()
                    .getFullQueryInfo(queryId)
                    .getQueryStats()
                    .getOperatorSummaries()
                    .stream()
                    .filter(summary -> nodeId.equals(summary.getPlanNodeId()))
                    .collect(toImmutableList());
            operatorStatsAssertion.accept(operatorStats);
            resultAssertion.accept(resultWithQueryId.getResult());
        });
    }

@alexjo2144
Copy link
Member Author

Moved the test into BaseIcebergConnectorTest and added lineitem as a table requirement there. Also added a split count assertion to make sure that no file level filtering is happening.

To make it (more) deterministic, one needs to use large probe side, to work around the fact we don't wait for anything.

It's already a select * from lineitem so I'm not sure we can go much bigger with the tpch tables we have in the connector test

@findepi
Copy link
Member

findepi commented Oct 8, 2021

It's already a select * from lineitem so I'm not sure we can go much bigger with the tpch tables we have in the connector test

query runner factory can provision tables from sf1 only, but the test can 'manually' use any scale factor we want (within reasonable execution time limits).

@findepi
Copy link
Member

findepi commented Oct 8, 2021

@alexjo2144 see ci

@alexjo2144
Copy link
Member Author

Thanks @findepi, I forgot I turned off auto-optimize imports. Fixed

@@ -395,6 +396,21 @@ protected void assertQueryStats(
Consumer<QueryStats> queryStatsAssertion,
Consumer<MaterializedResult> resultAssertion,
Duration timeout)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in cmt msg

Suppert assertQueryStats assertions that use queryId

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

however, drop this commit, see comment in next

Comment on lines 2204 to 2206
(statistics, queryId) -> {
OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(
queryId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you use queryId, but you don't use statistics
this looks like abuse of assertQueryStats

let's use executeWithQueryId + searchScanFilterAndProjectOperatorStats directly
let's wrap in assertEventually, linking to #5172

@findepi findepi merged commit 0ff73f3 into trinodb:master Oct 14, 2021
@github-actions github-actions bot added this to the 364 milestone Oct 14, 2021
@findepi findepi mentioned this pull request Oct 14, 2021
12 tasks
@alexjo2144 alexjo2144 deleted the alexjo/iceberg-df branch October 14, 2021 20:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants