-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
expression: propagate more filters in PropagateConstant #7276
Merged
+169
−73
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
a739864
expression/constant_propagation: propagate more filters in PropagateC…
bb7133 4d7d0dd
Updates based on CR
bb7133 d599bb1
Updates based on CR #2
bb7133 4f153e4
Bug fix
bb7133 fd15734
some updates on XuHuaiyu's comments:
bb7133 4d56250
Merge branch 'master' into bb7133/propagation
bb7133 64174cc
Merge branch 'master' into bb7133/propagation
shenli c7579ec
Merge branch 'master' into bb7133/propagation
eurekaka 7cfb801
Merge branch 'master' into bb7133/propagation
zz-jason File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,19 +27,6 @@ import ( | |
// MaxPropagateColsCnt means the max number of columns that can participate propagation. | ||
var MaxPropagateColsCnt = 100 | ||
|
||
var eqFuncNameMap = map[string]bool{ | ||
ast.EQ: true, | ||
} | ||
|
||
// inEqFuncNameMap stores all the in-equal operators that can be propagated. | ||
var inEqFuncNameMap = map[string]bool{ | ||
ast.LT: true, | ||
ast.GT: true, | ||
ast.LE: true, | ||
ast.GE: true, | ||
ast.NE: true, | ||
} | ||
|
||
type multiEqualSet struct { | ||
parent []int | ||
} | ||
|
@@ -72,51 +59,12 @@ type propagateConstantSolver struct { | |
ctx sessionctx.Context | ||
} | ||
|
||
// propagateInEQ propagates all in-equal conditions. | ||
// e.g. For expression a = b and b = c and c = d and c < 1 , we can get extra a < 1 and b < 1 and d < 1. | ||
// We maintain a unionSet representing the equivalent for every two columns. | ||
func (s *propagateConstantSolver) propagateInEQ() { | ||
s.unionSet = &multiEqualSet{} | ||
s.unionSet.init(len(s.columns)) | ||
for i := range s.conditions { | ||
if fun, ok := s.conditions[i].(*ScalarFunction); ok && fun.FuncName.L == ast.EQ { | ||
lCol, lOk := fun.GetArgs()[0].(*Column) | ||
rCol, rOk := fun.GetArgs()[1].(*Column) | ||
if lOk && rOk { | ||
lID := s.getColID(lCol) | ||
rID := s.getColID(rCol) | ||
s.unionSet.addRelation(lID, rID) | ||
} | ||
} | ||
} | ||
condsLen := len(s.conditions) | ||
for i := 0; i < condsLen; i++ { | ||
cond := s.conditions[i] | ||
col, con := s.validPropagateCond(cond, inEqFuncNameMap) | ||
if col == nil { | ||
continue | ||
} | ||
id := s.getColID(col) | ||
for j := range s.columns { | ||
if id != j && s.unionSet.findRoot(id) == s.unionSet.findRoot(j) { | ||
funName := cond.(*ScalarFunction).FuncName.L | ||
var newExpr Expression | ||
if _, ok := cond.(*ScalarFunction).GetArgs()[0].(*Column); ok { | ||
newExpr = NewFunctionInternal(s.ctx, funName, cond.GetType(), s.columns[j], con) | ||
} else { | ||
newExpr = NewFunctionInternal(s.ctx, funName, cond.GetType(), con, s.columns[j]) | ||
} | ||
s.conditions = append(s.conditions, newExpr) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// propagateEQ propagates equal expression multiple times. An example runs as following: | ||
// propagateConstantEQ propagates expressions like 'column = constant' by substituting the constant for column, the | ||
// procedure repeats multiple times. An example runs as following: | ||
// a = d & b * 2 = c & c = d + 2 & b = 1 & a = 4, we pick eq cond b = 1 and a = 4 | ||
// d = 4 & 2 = c & c = d + 2 & b = 1 & a = 4, we propagate b = 1 and a = 4 and pick eq cond c = 2 and d = 4 | ||
// d = 4 & 2 = c & false & b = 1 & a = 4, we propagate c = 2 and d = 4, and do constant folding: c = d + 2 will be folded as false. | ||
func (s *propagateConstantSolver) propagateEQ() { | ||
func (s *propagateConstantSolver) propagateConstantEQ() { | ||
s.eqList = make([]*Constant, len(s.columns)) | ||
visited := make([]bool, len(s.conditions)) | ||
for i := 0; i < MaxPropagateColsCnt; i++ { | ||
|
@@ -138,10 +86,72 @@ func (s *propagateConstantSolver) propagateEQ() { | |
} | ||
} | ||
|
||
// validPropagateCond checks if the cond is an expression like [column op constant] and op is in the funNameMap. | ||
func (s *propagateConstantSolver) validPropagateCond(cond Expression, funNameMap map[string]bool) (*Column, *Constant) { | ||
// propagateColumnEQ propagates expressions like 'column A = column B' by adding extra filters | ||
// 'expression(..., column B, ...)' propagated from 'expression(..., column A, ...)' as long as: | ||
// | ||
// 1. The expression is deterministic | ||
// 2. The expression doesn't have any side effect | ||
// | ||
// e.g. For expression a = b and b = c and c = d and c < 1 , we can get extra a < 1 and b < 1 and d < 1. | ||
// However, for a = b and a < rand(), we cannot propagate a < rand() to b < rand() because rand() is non-deterministic | ||
// | ||
// This propagation may bring redundancies that we need to resolve later, for example: | ||
// for a = b and a < 3 and b < 3, we get new a < 3 and b < 3, which are redundant | ||
// for a = b and a < 3 and 3 > b, we get new b < 3 and 3 > a, which are redundant | ||
// for a = b and a < 3 and b < 4, we get new a < 4 and b < 3 but should expect a < 3 and b < 3 | ||
// for a = b and a in (3) and b in (4), we get b in (3) and a in (4) but should expect 'false' | ||
// | ||
// TODO: remove redundancies later | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to add a github issue for this TODO item. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. |
||
// | ||
// We maintain a unionSet representing the equivalent for every two columns. | ||
func (s *propagateConstantSolver) propagateColumnEQ() { | ||
visited := make([]bool, len(s.conditions)) | ||
s.unionSet = &multiEqualSet{} | ||
s.unionSet.init(len(s.columns)) | ||
for i := range s.conditions { | ||
if fun, ok := s.conditions[i].(*ScalarFunction); ok && fun.FuncName.L == ast.EQ { | ||
lCol, lOk := fun.GetArgs()[0].(*Column) | ||
rCol, rOk := fun.GetArgs()[1].(*Column) | ||
if lOk && rOk { | ||
lID := s.getColID(lCol) | ||
rID := s.getColID(rCol) | ||
s.unionSet.addRelation(lID, rID) | ||
visited[i] = true | ||
} | ||
} | ||
} | ||
|
||
condsLen := len(s.conditions) | ||
for i, coli := range s.columns { | ||
for j := i + 1; j < len(s.columns); j++ { | ||
// unionSet doesn't have iterate(), we use a two layer loop to iterate col_i = col_j relation | ||
if s.unionSet.findRoot(i) != s.unionSet.findRoot(j) { | ||
continue | ||
} | ||
colj := s.columns[j] | ||
for k := 0; k < condsLen; k++ { | ||
if visited[k] { | ||
// cond_k has been used to retrieve equality relation | ||
continue | ||
} | ||
cond := s.conditions[k] | ||
replaced, _, newExpr := s.tryToReplaceCond(coli, colj, cond) | ||
if replaced { | ||
s.conditions = append(s.conditions, newExpr) | ||
} | ||
replaced, _, newExpr = s.tryToReplaceCond(colj, coli, cond) | ||
if replaced { | ||
s.conditions = append(s.conditions, newExpr) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
// validEqualCond checks if the cond is an expression like [column eq constant]. | ||
func (s *propagateConstantSolver) validEqualCond(cond Expression) (*Column, *Constant) { | ||
if eq, ok := cond.(*ScalarFunction); ok { | ||
if _, ok := funNameMap[eq.FuncName.L]; !ok { | ||
if eq.FuncName.L != ast.EQ { | ||
return nil, nil | ||
} | ||
if col, colOk := eq.GetArgs()[0].(*Column); colOk { | ||
|
@@ -158,6 +168,54 @@ func (s *propagateConstantSolver) validPropagateCond(cond Expression, funNameMap | |
return nil, nil | ||
} | ||
|
||
// tryToReplaceCond aims to replace all occurrences of column 'src' and try to replace it with 'tgt' in 'cond' | ||
// It returns | ||
// bool: if a replacement happened | ||
// bool: if 'cond' contains non-deterministic expression | ||
// Expression: the replaced expression, or original 'cond' if the replacement didn't happen | ||
// | ||
// For example: | ||
// for 'a, b, a < 3', it returns 'true, false, b < 3' | ||
// for 'a, b, sin(a) + cos(a) = 5', it returns 'true, false, returns sin(b) + cos(b) = 5' | ||
// for 'a, b, cast(a) < rand()', it returns 'false, true, cast(a) < rand()' | ||
func (s *propagateConstantSolver) tryToReplaceCond(src *Column, tgt *Column, cond Expression) (bool, bool, Expression) { | ||
sf, ok := cond.(*ScalarFunction) | ||
if !ok { | ||
return false, false, cond | ||
} | ||
replaced := false | ||
var args []Expression | ||
if _, ok := unFoldableFunctions[sf.FuncName.L]; ok { | ||
return false, true, cond | ||
} | ||
for idx, expr := range sf.GetArgs() { | ||
if src.Equal(nil, expr) { | ||
replaced = true | ||
if args == nil { | ||
args = make([]Expression, len(sf.GetArgs())) | ||
copy(args, sf.GetArgs()) | ||
} | ||
args[idx] = tgt | ||
} else { | ||
subReplaced, isNonDeterminisitic, subExpr := s.tryToReplaceCond(src, tgt, expr) | ||
if isNonDeterminisitic { | ||
return false, true, cond | ||
} else if subReplaced { | ||
replaced = true | ||
if args == nil { | ||
args = make([]Expression, len(sf.GetArgs())) | ||
copy(args, sf.GetArgs()) | ||
} | ||
args[idx] = subExpr | ||
} | ||
} | ||
} | ||
if replaced { | ||
return true, false, NewFunctionInternal(s.ctx, sf.FuncName.L, sf.GetType(), args...) | ||
} | ||
return false, false, cond | ||
} | ||
|
||
func (s *propagateConstantSolver) setConds2ConstFalse() { | ||
s.conditions = []Expression{&Constant{ | ||
Value: types.NewDatum(false), | ||
|
@@ -172,7 +230,7 @@ func (s *propagateConstantSolver) pickNewEQConds(visited []bool) (retMapper map[ | |
if visited[i] { | ||
continue | ||
} | ||
col, con := s.validPropagateCond(cond, eqFuncNameMap) | ||
col, con := s.validEqualCond(cond) | ||
// Then we check if this CNF item is a false constant. If so, we will set the whole condition to false. | ||
var ok bool | ||
if col == nil { | ||
|
@@ -227,8 +285,8 @@ func (s *propagateConstantSolver) solve(conditions []Expression) []Expression { | |
log.Warnf("[const_propagation]Too many columns in a single CNF: the column count is %d, the max count is %d.", len(s.columns), MaxPropagateColsCnt) | ||
return conditions | ||
} | ||
s.propagateEQ() | ||
s.propagateInEQ() | ||
s.propagateConstantEQ() | ||
s.propagateColumnEQ() | ||
for i, cond := range s.conditions { | ||
if dnf, ok := cond.(*ScalarFunction); ok && dnf.FuncName.L == ast.LogicOr { | ||
dnfItems := SplitDNFItems(cond) | ||
|
@@ -255,7 +313,7 @@ func (s *propagateConstantSolver) insertCol(col *Column) { | |
} | ||
} | ||
|
||
// PropagateConstant propagate constant values of equality predicates and inequality predicates in a condition. | ||
// PropagateConstant propagate constant values of deterministic predicates in a condition. | ||
func PropagateConstant(ctx sessionctx.Context, conditions []Expression) []Expression { | ||
solver := &propagateConstantSolver{ | ||
colMapper: make(map[string]int), | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the present planner, we can do this by adding a post-process after the logical optimization, before deriving the statistics and build index/table ranges.
In the new cascades planner, we can do this by adding a rule which only works on a
Filter
operator and remove the duplicated filters.@bb7133 What's your opinion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current ranger can spot these redundant filters and merge them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eurekaka maybe we can do the enhancement in ranger?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like ranger would only simplify and merge these filters when they are considered to be index filters, if there is no index and no int primary key, these filters would be in PhysicalSelection as what they are originally; maybe we should extract the range simplification logic out of ranger, and this should be like the 'post-process' you mentioned above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zz-jason Sounds interesting! And looks @tiancaiamao is trying to solve (at least part of) the redundant filters in his PR, which we can expect.