Skip to content

Commit

Permalink
planner: UPDATE's select plan's output col IDs should be stable (#53268
Browse files Browse the repository at this point in the history
…) (#53274)

close #53236
  • Loading branch information
ti-chi-bot authored May 29, 2024
1 parent bf71326 commit eab1f8e
Show file tree
Hide file tree
Showing 26 changed files with 292 additions and 226 deletions.
1 change: 0 additions & 1 deletion build/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ nogo(
}) +
select({
"//build:without_rbe": [
"//build/linter/filepermission",
],
"//conditions:default": [],
}),
Expand Down
3 changes: 1 addition & 2 deletions pkg/expression/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ go_library(
"//pkg/parser/opcode",
"//pkg/parser/terror",
"//pkg/parser/types",
"//pkg/planner/funcdep",
"//pkg/privilege",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
Expand All @@ -98,6 +97,7 @@ go_library(
"//pkg/util/encrypt",
"//pkg/util/generatedexpr",
"//pkg/util/hack",
"//pkg/util/intset",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"//pkg/util/mock",
Expand All @@ -119,7 +119,6 @@ go_library(
"@com_github_pingcap_tipb//go-tipb",
"@com_github_pkg_errors//:errors",
"@com_github_tikv_client_go_v2//oracle",
"@org_golang_x_tools//container/intsets",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
26 changes: 13 additions & 13 deletions pkg/expression/grouping_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/mysql"
fd "github.com/pingcap/tidb/pkg/planner/funcdep"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/util/intset"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/pingcap/tipb/go-tipb"
)
Expand Down Expand Up @@ -178,7 +178,7 @@ func (gss GroupingSets) TargetOne(normalAggArgs []Expression) int {
return 0
}
// for other normal agg args like: count(a), count(a+b), count(not(a is null)) and so on.
normalAggArgsIDSet := fd.NewFastIntSet()
normalAggArgsIDSet := intset.NewFastIntSet()
for _, one := range columnInNormalAggArgs {
normalAggArgsIDSet.Insert(int(one.UniqueID))
}
Expand All @@ -202,7 +202,7 @@ func (gss GroupingSets) TargetOne(normalAggArgs []Expression) int {
func (gss GroupingSets) NeedCloneColumn() bool {
// for grouping sets like: {<a,c>},{<c>} / {<a,c>},{<b,c>}
// the column c should be copied one more time here, otherwise it will be filled with null values and not visible for the other grouping set again.
setIDs := make([]*fd.FastIntSet, 0, len(gss))
setIDs := make([]*intset.FastIntSet, 0, len(gss))
for _, groupingSet := range gss {
setIDs = append(setIDs, groupingSet.AllColIDs())
}
Expand Down Expand Up @@ -231,8 +231,8 @@ func (gs GroupingSet) IsEmpty() bool {
}

// AllColIDs collect all the grouping col's uniqueID. (here assuming that all the grouping expressions are single col)
func (gs GroupingSet) AllColIDs() *fd.FastIntSet {
res := fd.NewFastIntSet()
func (gs GroupingSet) AllColIDs() *intset.FastIntSet {
res := intset.NewFastIntSet()
for _, groupingExprs := range gs {
// on the condition that every grouping expression is single column.
// eg: group by a, b, c
Expand Down Expand Up @@ -313,8 +313,8 @@ func (gss GroupingSets) IsEmpty() bool {
}

// AllSetsColIDs is used to collect all the column id inside into a fast int set.
func (gss GroupingSets) AllSetsColIDs() *fd.FastIntSet {
res := fd.NewFastIntSet()
func (gss GroupingSets) AllSetsColIDs() *intset.FastIntSet {
res := intset.NewFastIntSet()
for _, groupingSet := range gss {
res.UnionWith(*groupingSet.AllColIDs())
}
Expand Down Expand Up @@ -361,8 +361,8 @@ func (g GroupingExprs) IsEmpty() bool {

// SubSetOf is used to do the logical computation of subset between two grouping expressions.
func (g GroupingExprs) SubSetOf(other GroupingExprs) bool {
oldOne := fd.NewFastIntSet()
newOne := fd.NewFastIntSet()
oldOne := intset.NewFastIntSet()
newOne := intset.NewFastIntSet()
for _, one := range g {
oldOne.Insert(int(one.(*Column).UniqueID))
}
Expand All @@ -373,8 +373,8 @@ func (g GroupingExprs) SubSetOf(other GroupingExprs) bool {
}

// IDSet is used to collect column ids inside grouping expressions into a fast int set.
func (g GroupingExprs) IDSet() *fd.FastIntSet {
res := fd.NewFastIntSet()
func (g GroupingExprs) IDSet() *intset.FastIntSet {
res := intset.NewFastIntSet()
for _, one := range g {
res.Insert(int(one.(*Column).UniqueID))
}
Expand Down Expand Up @@ -493,7 +493,7 @@ func AdjustNullabilityFromGroupingSets(gss GroupingSets, schema *Schema) {
// set, so it won't be filled with null value at any time, the nullable change is unnecessary.
groupingIDs := gss.AllSetsColIDs()
// cache the grouping ids set to avoid fetch them multi times below.
groupingIDsSlice := make([]*fd.FastIntSet, 0, len(gss))
groupingIDsSlice := make([]*intset.FastIntSet, 0, len(gss))
for _, oneGroupingSet := range gss {
groupingIDsSlice = append(groupingIDsSlice, oneGroupingSet.AllColIDs())
}
Expand Down Expand Up @@ -570,7 +570,7 @@ func (gss GroupingSets) DistinctSize() (int, []uint64, map[int]map[uint64]struct
func (gss GroupingSets) DistinctSizeWithThreshold(N int) (int, []uint64, map[int]map[uint64]struct{}) {
// all the group by item are col, deduplicate from id-set.
distinctGroupingIDsPos := make([]int, 0, len(gss))
originGroupingIDsSlice := make([]*fd.FastIntSet, 0, len(gss))
originGroupingIDsSlice := make([]*intset.FastIntSet, 0, len(gss))

for _, oneGroupingSet := range gss {
curIDs := oneGroupingSet.AllColIDs()
Expand Down
10 changes: 5 additions & 5 deletions pkg/expression/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import (
driver "github.com/pingcap/tidb/pkg/types/parser_driver"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/intset"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
"golang.org/x/tools/container/intsets"
)

// cowExprRef is a copy-on-write slice ref util using in `ColumnSubstitute`
Expand Down Expand Up @@ -372,15 +372,15 @@ func ExtractColumnsAndCorColumnsFromExpressions(result []*Column, list []Express
}

// ExtractColumnSet extracts the different values of `UniqueId` for columns in expressions.
func ExtractColumnSet(exprs ...Expression) *intsets.Sparse {
set := &intsets.Sparse{}
func ExtractColumnSet(exprs ...Expression) intset.FastIntSet {
set := intset.NewFastIntSet()
for _, expr := range exprs {
extractColumnSet(expr, set)
extractColumnSet(expr, &set)
}
return set
}

func extractColumnSet(expr Expression, set *intsets.Sparse) {
func extractColumnSet(expr Expression, set *intset.FastIntSet) {
switch v := expr.(type) {
case *Column:
set.Insert(int(v.UniqueID))
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ go_library(
"//pkg/util/hack",
"//pkg/util/hint",
"//pkg/util/intest",
"//pkg/util/intset",
"//pkg/util/kvcache",
"//pkg/util/logutil",
"//pkg/util/mathutil",
Expand Down
8 changes: 4 additions & 4 deletions pkg/planner/core/collect_column_stats_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package core
import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/planner/funcdep"
"github.com/pingcap/tidb/pkg/util/intset"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -48,7 +48,7 @@ type columnStatsUsageCollector struct {

// visitedPhysTblIDs all ds.physicalTableID that have been visited.
// It's always collected, even collectHistNeededColumns is not set.
visitedPhysTblIDs *funcdep.FastIntSet
visitedPhysTblIDs *intset.FastIntSet

// collectVisitedTable indicates whether to collect visited table
collectVisitedTable bool
Expand All @@ -57,7 +57,7 @@ type columnStatsUsageCollector struct {
}

func newColumnStatsUsageCollector(collectMode uint64, enabledPlanCapture bool) *columnStatsUsageCollector {
set := funcdep.NewFastIntSet()
set := intset.NewFastIntSet()
collector := &columnStatsUsageCollector{
collectMode: collectMode,
// Pre-allocate a slice to reduce allocation, 8 doesn't have special meaning.
Expand Down Expand Up @@ -309,7 +309,7 @@ func (c *columnStatsUsageCollector) collectFromPlan(lp LogicalPlan) {
func CollectColumnStatsUsage(lp LogicalPlan, predicate, histNeeded bool) (
[]model.TableItemID,
[]model.TableItemID,
*funcdep.FastIntSet,
*intset.FastIntSet,
) {
var mode uint64
if predicate {
Expand Down
23 changes: 12 additions & 11 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/hint"
"github.com/pingcap/tidb/pkg/util/intset"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/pingcap/tidb/pkg/util/plancodec"
Expand Down Expand Up @@ -1869,7 +1870,7 @@ func (b *PlanBuilder) buildProjection(ctx context.Context, p LogicalPlan, fields
if fields[offset].AuxiliaryColInAgg {
continue
}
item := fd.NewFastIntSet()
item := intset.NewFastIntSet()
switch x := expr.(type) {
case *expression.Column:
item.Insert(int(x.UniqueID))
Expand Down Expand Up @@ -1911,7 +1912,7 @@ func (b *PlanBuilder) buildProjection(ctx context.Context, p LogicalPlan, fields
baseCols := expression.ExtractColumns(expr)
errShowCol := baseCols[0]
for _, col := range baseCols {
colSet := fd.NewFastIntSet(int(col.UniqueID))
colSet := intset.NewFastIntSet(int(col.UniqueID))
if !colSet.SubsetOf(strictClosure) {
errShowCol = col
break
Expand All @@ -1936,7 +1937,7 @@ func (b *PlanBuilder) buildProjection(ctx context.Context, p LogicalPlan, fields
}
if fds.GroupByCols.Only1Zero() {
// maxOneRow is delayed from agg's ExtractFD logic since some details listed in it.
projectionUniqueIDs := fd.NewFastIntSet()
projectionUniqueIDs := intset.NewFastIntSet()
for _, expr := range proj.Exprs {
switch x := expr.(type) {
case *expression.Column:
Expand Down Expand Up @@ -5438,15 +5439,15 @@ func (ds *DataSource) ExtractFD() *fd.FDSet {
// Once the all conditions are not equal to nil, built it again.
if ds.fdSet == nil || ds.allConds != nil {
fds := &fd.FDSet{HashCodeToUniqueID: make(map[string]int)}
allCols := fd.NewFastIntSet()
allCols := intset.NewFastIntSet()
// should use the column's unique ID avoiding fdSet conflict.
for _, col := range ds.TblCols {
// todo: change it to int64
allCols.Insert(int(col.UniqueID))
}
// int pk doesn't store its index column in indexInfo.
if ds.tableInfo.PKIsHandle {
keyCols := fd.NewFastIntSet()
keyCols := intset.NewFastIntSet()
for _, col := range ds.TblCols {
if mysql.HasPriKeyFlag(col.RetType.GetFlag()) {
keyCols.Insert(int(col.UniqueID))
Expand All @@ -5472,7 +5473,7 @@ func (ds *DataSource) ExtractFD() *fd.FDSet {
}
// other indices including common handle.
for _, idx := range ds.tableInfo.Indices {
keyCols := fd.NewFastIntSet()
keyCols := intset.NewFastIntSet()
allColIsNotNull := true
if ds.isForUpdateRead && changed {
latestIndex, ok := latestIndexes[idx.ID]
Expand Down Expand Up @@ -5531,14 +5532,14 @@ func (ds *DataSource) ExtractFD() *fd.FDSet {
// the generated column is sequentially dependent on the forward column.
// a int, b int as (a+1), c int as (b+1), here we can build the strict FD down:
// {a} -> {b}, {b} -> {c}, put the maintenance of the dependencies between generated columns to the FD graph.
notNullCols := fd.NewFastIntSet()
notNullCols := intset.NewFastIntSet()
for _, col := range ds.TblCols {
if col.VirtualExpr != nil {
dependencies := fd.NewFastIntSet()
dependencies := intset.NewFastIntSet()
dependencies.Insert(int(col.UniqueID))
// dig out just for 1 level.
directBaseCol := expression.ExtractColumns(col.VirtualExpr)
determinant := fd.NewFastIntSet()
determinant := intset.NewFastIntSet()
for _, col := range directBaseCol {
determinant.Insert(int(col.UniqueID))
}
Expand Down Expand Up @@ -6429,8 +6430,8 @@ func (b *PlanBuilder) buildUpdateLists(ctx context.Context, tableList []*ast.Tab
allAssignmentsAreConstant = false
}
p = np
if col, ok := newExpr.(*expression.Column); ok {
b.ctx.GetSessionVars().StmtCtx.ColRefFromUpdatePlan = append(b.ctx.GetSessionVars().StmtCtx.ColRefFromUpdatePlan, col.UniqueID)
if cols := expression.ExtractColumnSet(newExpr); cols.Len() > 0 {
b.ctx.GetSessionVars().StmtCtx.ColRefFromUpdatePlan.UnionWith(cols)
}
newList = append(newList, &expression.Assignment{Col: col, ColName: name.ColName, Expr: newExpr})
dbName := name.DBName.L
Expand Down
Loading

0 comments on commit eab1f8e

Please sign in to comment.