Skip to content

Commit

Permalink
ESQL: Push down filter passed lookup join (elastic#118410)
Browse files Browse the repository at this point in the history
Improve the planner to detect filters that can be pushed down 'through'
 a LOOKUP JOIN by determining the conditions scoped to the left/main
 side and moving them closer to the source.

Relates elastic#118305
  • Loading branch information
costin committed Dec 13, 2024
1 parent 05e4ef7 commit 3cbf208
Show file tree
Hide file tree
Showing 5 changed files with 369 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/118410.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118410
summary: Push down filter passed lookup join
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static Expression combineAnd(List<Expression> exps) {
*
* using the given combiner.
*
* While a bit longer, this method creates a balanced tree as oppose to a plain
* While a bit longer, this method creates a balanced tree as opposed to a plain
* recursive approach which creates an unbalanced one (either to the left or right).
*/
private static Expression combine(List<Expression> exps, BiFunction<Expression, Expression, Expression> combiner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,68 @@ count:long | type:keyword
3 | Success
1 | Disconnected
;

//
// Filtering tests
//

lookupWithFilterOnLeftSideField
required_capability: join_lookup_v5

FROM employees
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| SORT emp_no
| KEEP emp_no, language_code, language_name
| WHERE emp_no >= 10091 AND emp_no < 10094
;

emp_no:integer | language_code:integer | language_name:keyword
10091 | 3 | Spanish
10092 | 1 | English
10093 | 3 | Spanish
;

lookupMessageWithFilterOnRightSideField-Ignore
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
| WHERE type == "Error"
| KEEP @timestamp, client_ip, event_duration, message, type
| SORT @timestamp DESC
;

@timestamp:date | client_ip:ip | event_duration:long | message:keyword | type:keyword
2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Error
2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Error
2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Error
;

lookupWithFieldAndRightSideAfterStats
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
| STATS count = count(message) BY type
| WHERE type == "Error"
;

count:long | type:keyword
3 | Error
;

lookupWithFieldOnJoinKey-Ignore
required_capability: join_lookup_v5

FROM employees
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| WHERE language_code > 1 AND language_name IS NOT NULL
| KEEP emp_no, language_code, language_name
;

emp_no:integer | language_code:integer | language_name:keyword
10001 | 2 | French
10003 | 4 | German
;
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.expression.predicate.Predicates;
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
Expand All @@ -23,6 +24,8 @@
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -76,11 +79,63 @@ protected LogicalPlan rule(Filter filter) {
} else if (child instanceof OrderBy orderBy) {
// swap the filter with its child
plan = orderBy.replaceChild(filter.with(orderBy.child(), condition));
} else if (child instanceof Join join) {
return pushDownPastJoin(filter, join);
}
// cannot push past a Limit, this could change the tailing result set returned
return plan;
}

private record ScopedFilter(List<Expression> commonFilters, List<Expression> leftFilters, List<Expression> rightFilters) {}

// split the filter condition in 3 parts:
// 1. filter scoped to the left
// 2. filter scoped to the right
// 3. filter that requires both sides to be evaluated
private static ScopedFilter scopeFilter(List<Expression> filters, LogicalPlan left, LogicalPlan right) {
List<Expression> rest = new ArrayList<>(filters);
List<Expression> leftFilters = new ArrayList<>();
List<Expression> rightFilters = new ArrayList<>();

AttributeSet leftOutput = left.outputSet();
AttributeSet rightOutput = right.outputSet();

// first remove things that are left scoped only
rest.removeIf(f -> f.references().subsetOf(leftOutput) && leftFilters.add(f));
// followed by right scoped only
rest.removeIf(f -> f.references().subsetOf(rightOutput) && rightFilters.add(f));
return new ScopedFilter(rest, leftFilters, rightFilters);
}

private static LogicalPlan pushDownPastJoin(Filter filter, Join join) {
LogicalPlan plan = filter;
// pushdown only through LEFT joins
// TODO: generalize this for other join types
if (join.config().type() == JoinTypes.LEFT) {
LogicalPlan left = join.left();
LogicalPlan right = join.right();

// split the filter condition in 3 parts:
// 1. filter scoped to the left
// 2. filter scoped to the right
// 3. filter that requires both sides to be evaluated
ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right);
// push the left scoped filter down to the left child, keep the rest intact
if (scoped.leftFilters.size() > 0) {
// push the filter down to the left child
left = new Filter(left.source(), left, Predicates.combineAnd(scoped.leftFilters));
// update the join with the new left child
join = (Join) join.replaceLeft(left);

// keep the remaining filters in place, otherwise return the new join;
Expression remainingFilter = Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters, scoped.rightFilters));
plan = remainingFilter != null ? filter.with(join, remainingFilter) : join;
}
}
// ignore the rest of the join
return plan;
}

private static Function<Expression, Expression> NO_OP = expression -> expression;

private static LogicalPlan maybePushDownPastUnary(
Expand Down
Loading

0 comments on commit 3cbf208

Please sign in to comment.