diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index 297506380aae0..de6fcd8675f44 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -798,12 +798,15 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) exec.Executor end: v.Offset + v.Count, } - childUsedSchemaLen := v.Children()[0].Schema().Len() + childSchemaLen := v.Children()[0].Schema().Len() childUsedSchema := markChildrenUsedCols(v.Schema().Columns, v.Children()[0].Schema())[0] e.columnIdxsUsedByChild = make([]int, 0, len(childUsedSchema)) e.columnIdxsUsedByChild = append(e.columnIdxsUsedByChild, childUsedSchema...) - if len(e.columnIdxsUsedByChild) == childUsedSchemaLen { + if len(e.columnIdxsUsedByChild) == childSchemaLen { e.columnIdxsUsedByChild = nil // indicates that all columns are used. LimitExec will improve performance for this condition. + } else { + // construct a project evaluator to do the inline projection + e.columnSwapHelper = chunk.NewColumnSwapHelper(e.columnIdxsUsedByChild) } return e } diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 71feeb2086dc2..9da2a016aea25 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -1332,6 +1332,7 @@ type LimitExec struct { // columnIdxsUsedByChild keep column indexes of child executor used for inline projection columnIdxsUsedByChild []int + columnSwapHelper *chunk.ColumnSwapHelper // Log the close time when opentracing is enabled. span opentracing.Span @@ -1393,10 +1394,9 @@ func (e *LimitExec) Next(ctx context.Context, req *chunk.Chunk) error { e.cursor += batchSize if e.columnIdxsUsedByChild != nil { - for i, childIdx := range e.columnIdxsUsedByChild { - if err = req.SwapColumn(i, e.childResult, childIdx); err != nil { - return err - } + err = e.columnSwapHelper.SwapColumns(e.childResult, req) + if err != nil { + return err } } else { req.SwapColumns(e.childResult) diff --git a/pkg/expression/evaluator.go b/pkg/expression/evaluator.go index 9bef247ec9b32..00a8dc670db4f 100644 --- a/pkg/expression/evaluator.go +++ b/pkg/expression/evaluator.go @@ -15,126 +15,9 @@ package expression import ( - "sync/atomic" - "github.com/pingcap/tidb/pkg/util/chunk" - "github.com/pingcap/tidb/pkg/util/disjointset" - "github.com/pingcap/tidb/pkg/util/intest" ) -type columnEvaluator struct { - inputIdxToOutputIdxes map[int][]int - // mergedInputIdxToOutputIdxes is only determined in runtime when saw the input chunk. - mergedInputIdxToOutputIdxes atomic.Pointer[map[int][]int] -} - -// run evaluates "Column" expressions. -// NOTE: It should be called after all the other expressions are evaluated -// -// since it will change the content of the input Chunk. -func (e *columnEvaluator) run(ctx EvalContext, input, output *chunk.Chunk) error { - // mergedInputIdxToOutputIdxes only can be determined in runtime when we saw the input chunk structure. - if e.mergedInputIdxToOutputIdxes.Load() == nil { - e.mergeInputIdxToOutputIdxes(input, e.inputIdxToOutputIdxes) - } - for inputIdx, outputIdxes := range *e.mergedInputIdxToOutputIdxes.Load() { - if err := output.SwapColumn(outputIdxes[0], input, inputIdx); err != nil { - return err - } - for i, length := 1, len(outputIdxes); i < length; i++ { - output.MakeRef(outputIdxes[0], outputIdxes[i]) - } - } - return nil -} - -// mergeInputIdxToOutputIdxes merges separate inputIdxToOutputIdxes entries when column references -// are detected within the input chunk. This process ensures consistent handling of columns derived -// from the same original source. -// -// Consider the following scenario: -// -// Initial scan operation produces a column 'a': -// -// scan: a (addr: ???) -// -// This column 'a' is used in the first projection (proj1) to create two columns a1 and a2, both referencing 'a': -// -// proj1 -// / \ -// / \ -// / \ -// a1 (addr: 0xe) a2 (addr: 0xe) -// / \ -// / \ -// / \ -// proj2 proj2 -// / \ / \ -// / \ / \ -// a3 a4 a5 a6 -// -// (addr: 0xe) (addr: 0xe) (addr: 0xe) (addr: 0xe) -// -// Here, a1 and a2 share the same address (0xe), indicating they reference the same data from the original 'a'. -// -// When moving to the second projection (proj2), the system tries to project these columns further: -// - The first set (left side) consists of a3 and a4, derived from a1, both retaining the address (0xe). -// - The second set (right side) consists of a5 and a6, derived from a2, also starting with address (0xe). -// -// When proj1 is complete, the output chunk contains two columns [a1, a2], both derived from the single column 'a' from the scan. -// Since both a1 and a2 are column references with the same address (0xe), they are treated as referencing the same data. -// -// In proj2, two separate items are created: -// - <0, [0,1]>: This means the 0th input column (a1) is projected twice, into the 0th and 1st columns of the output chunk. -// - <1, [2,3]>: This means the 1st input column (a2) is projected twice, into the 2nd and 3rd columns of the output chunk. -// -// Due to the column swapping logic in each projection, after applying the <0, [0,1]> projection, -// the addresses for a1 and a2 may become swapped or invalid: -// -// proj1: a1 (addr: invalid) a2 (addr: invalid) -// -// This can lead to issues in proj2, where further operations on these columns may be unsafe: -// -// proj2: a3 (addr: 0xe) a4 (addr: 0xe) a5 (addr: ???) a6 (addr: ???) -// -// Therefore, it's crucial to identify and merge the original column references early, ensuring -// the final inputIdxToOutputIdxes mapping accurately reflects the shared origins of the data. -// For instance, <0, [0,1,2,3]> indicates that the 0th input column (original 'a') is referenced -// by all four output columns in the final output. -// -// mergeInputIdxToOutputIdxes merges inputIdxToOutputIdxes based on detected column references. -// This ensures that columns with the same reference are correctly handled in the output chunk. -func (e *columnEvaluator) mergeInputIdxToOutputIdxes(input *chunk.Chunk, inputIdxToOutputIdxes map[int][]int) { - originalDJSet := disjointset.NewSet[int](4) - flag := make([]bool, input.NumCols()) - // Detect self column-references inside the input chunk by comparing column addresses - for i := 0; i < input.NumCols(); i++ { - if flag[i] { - continue - } - for j := i + 1; j < input.NumCols(); j++ { - if input.Column(i) == input.Column(j) { - flag[j] = true - originalDJSet.Union(i, j) - } - } - } - // Merge inputIdxToOutputIdxes based on the detected column references. - newInputIdxToOutputIdxes := make(map[int][]int, len(inputIdxToOutputIdxes)) - for inputIdx := range inputIdxToOutputIdxes { - // Root idx is internal offset, not the right column index. - originalRootIdx := originalDJSet.FindRoot(inputIdx) - originalVal, ok := originalDJSet.FindVal(originalRootIdx) - intest.Assert(ok) - mergedOutputIdxes := newInputIdxToOutputIdxes[originalVal] - mergedOutputIdxes = append(mergedOutputIdxes, inputIdxToOutputIdxes[inputIdx]...) - newInputIdxToOutputIdxes[originalVal] = mergedOutputIdxes - } - // Update the merged inputIdxToOutputIdxes automatically. - // Once failed, it means other worker has done this job at meantime. - e.mergedInputIdxToOutputIdxes.CompareAndSwap(nil, &newInputIdxToOutputIdxes) -} - type defaultEvaluator struct { outputIdxes []int exprs []Expression @@ -175,8 +58,8 @@ func (e *defaultEvaluator) run(ctx EvalContext, vecEnabled bool, input, output * // It separates them to "column" and "other" expressions and evaluates "other" // expressions before "column" expressions. type EvaluatorSuite struct { - *columnEvaluator // Evaluator for column expressions. - *defaultEvaluator // Evaluator for other expressions. + ColumnSwapHelper *chunk.ColumnSwapHelper // Evaluator for column expressions. + *defaultEvaluator // Evaluator for other expressions. } // NewEvaluatorSuite creates an EvaluatorSuite to evaluate all the exprs. @@ -186,11 +69,11 @@ func NewEvaluatorSuite(exprs []Expression, avoidColumnEvaluator bool) *Evaluator for i := 0; i < len(exprs); i++ { if col, isCol := exprs[i].(*Column); isCol && !avoidColumnEvaluator { - if e.columnEvaluator == nil { - e.columnEvaluator = &columnEvaluator{inputIdxToOutputIdxes: make(map[int][]int)} + if e.ColumnSwapHelper == nil { + e.ColumnSwapHelper = &chunk.ColumnSwapHelper{InputIdxToOutputIdxes: make(map[int][]int)} } inputIdx, outputIdx := col.Index, i - e.columnEvaluator.inputIdxToOutputIdxes[inputIdx] = append(e.columnEvaluator.inputIdxToOutputIdxes[inputIdx], outputIdx) + e.ColumnSwapHelper.InputIdxToOutputIdxes[inputIdx] = append(e.ColumnSwapHelper.InputIdxToOutputIdxes[inputIdx], outputIdx) continue } if e.defaultEvaluator == nil { @@ -224,8 +107,10 @@ func (e *EvaluatorSuite) Run(ctx EvalContext, vecEnabled bool, input, output *ch } } - if e.columnEvaluator != nil { - return e.columnEvaluator.run(ctx, input, output) + // NOTE: It should be called after all the other expressions are evaluated + // since it will change the content of the input Chunk. + if e.ColumnSwapHelper != nil { + return e.ColumnSwapHelper.SwapColumns(input, output) } return nil } diff --git a/pkg/expression/evaluator_test.go b/pkg/expression/evaluator_test.go index e77279f4e318e..b9a08f3c264fe 100644 --- a/pkg/expression/evaluator_test.go +++ b/pkg/expression/evaluator_test.go @@ -15,7 +15,6 @@ package expression import ( - "slices" "testing" "time" @@ -607,42 +606,3 @@ func TestMod(t *testing.T) { require.NoError(t, err) require.Equal(t, types.NewDatum(1.5), r) } - -func TestMergeInputIdxToOutputIdxes(t *testing.T) { - ctx := createContext(t) - inputIdxToOutputIdxes := make(map[int][]int) - // input 0th should be column referred as 0th and 1st in output columns. - inputIdxToOutputIdxes[0] = []int{0, 1} - // input 1th should be column referred as 2nd and 3rd in output columns. - inputIdxToOutputIdxes[1] = []int{2, 3} - columnEval := columnEvaluator{inputIdxToOutputIdxes: inputIdxToOutputIdxes} - - input := chunk.NewEmptyChunk([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)}) - input.AppendInt64(0, 99) - // input chunk's 0th and 1st are column referred itself. - input.MakeRef(0, 1) - - // chunk: col1 <---(ref) col2 - // ____________/ \___________/ \___ - // proj: col1 col2 col3 col4 - // - // original case after inputIdxToOutputIdxes[0], the original col2 will be nil pointer - // cause consecutive col3,col4 ref projection are invalid. - // - // after fix, the new inputIdxToOutputIdxes should be: inputIdxToOutputIdxes[0]: {0, 1, 2, 3} - - output := chunk.NewEmptyChunk([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong), - types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)}) - - err := columnEval.run(ctx, input, output) - require.NoError(t, err) - // all four columns are column-referred, pointing to the first one. - require.Equal(t, output.Column(0), output.Column(1)) - require.Equal(t, output.Column(1), output.Column(2)) - require.Equal(t, output.Column(2), output.Column(3)) - require.Equal(t, output.GetRow(0).GetInt64(0), int64(99)) - - require.Equal(t, len(*columnEval.mergedInputIdxToOutputIdxes.Load()), 1) - slices.Sort((*columnEval.mergedInputIdxToOutputIdxes.Load())[0]) - require.Equal(t, (*columnEval.mergedInputIdxToOutputIdxes.Load())[0], []int{0, 1, 2, 3}) -} diff --git a/pkg/expression/integration_test/BUILD.bazel b/pkg/expression/integration_test/BUILD.bazel index b7b517b8762fb..eab016ca24fa1 100644 --- a/pkg/expression/integration_test/BUILD.bazel +++ b/pkg/expression/integration_test/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 25, + shard_count = 26, deps = [ "//pkg/config", "//pkg/domain", diff --git a/pkg/expression/integration_test/integration_test.go b/pkg/expression/integration_test/integration_test.go index 889d97d9545f4..3ca9ca44e833c 100644 --- a/pkg/expression/integration_test/integration_test.go +++ b/pkg/expression/integration_test/integration_test.go @@ -3002,3 +3002,52 @@ func TestIssue43527(t *testing.T) { "SELECT @total := @total + d FROM (SELECT d FROM test) AS temp, (SELECT @total := b FROM test) AS T1 where @total >= 100", ).Check(testkit.Rows("200", "300", "400", "500")) } + +func TestIssue55885(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t_jg8o (c_s int not null unique ,c__qy double ,c_z int not null ,c_a90ol text not null);") + tk.MustExec("insert into t_jg8o (c_s, c__qy, c_z, c_a90ol) values" + + "(-975033779, 85.65, -355481284, 'gnip' ),(-2018599732, 85.86, 1617093413, 'm' )," + + "(-960107027, 4.6, -2042358076, 'y1q')," + + "(-3, 38.1, -1528586343, 'ex_2')," + + "(69386953, 32768.0, -62220810, 'tfkxjj5c')," + + "(587181689, -9223372036854775806.3, -1666156943, 'queemvgj')," + + "(-218561796, 85.2, -670390288, 'nf990nol')," + + "(858419954, 2147483646.0, -1649362344, 'won_9')," + + "(-1120115215, 22.100, 1509989939, 'w')," + + "(-1388119356, 94.32, -1694148464, 'gu4i4knyhm')," + + "(-1016230734, -4294967295.8, 1430313391, 's')," + + "(-1861825796, 36.52, -1457928755, 'j')," + + "(1963621165, 88.87, 18928603, 'gxbsloff' )," + + "(1492879828, cast(null as double), 759883041, 'zwue')," + + "(-1607192175, 12.36, 1669523024, 'qt5zch71a')," + + "(1534068569, 46.79, -392085130, 'bc')," + + "(155707446, 9223372036854775809.4, 1727199557, 'qyghenu9t6')," + + "(-1524976778, 75.99, 335492222, 'sdgde0z')," + + "(175403335, cast(null as double), -69711503, 'ja')," + + "(-272715456, 48.62, 753928713, 'ur')," + + "(-2035825967, 257.3, -1598426762, 'lmqmn')," + + "(-1178957955, 2147483648.100000, 1432554380, 'dqpb210')," + + "(-2056628646, 254.5, -1476177588, 'k41ajpt7x')," + + "(-914210874, 126.7, -421919910, 'x57ud7oy1')," + + "(-88586773, 1.2, 1568247510, 'drmxi8')," + + "(-834563269, -4294967296.7, 1163133933, 'wp')," + + "(-84490060, 54.13, -630289437, '_3_twecg5h')," + + " (267700893, 54.75, 370343042, 'n72')," + + "(552106333, 32766.2, 2365745, 's7tt')," + + "(643440707, 65536.8, -850412592, 'wmluxa9a')," + + "(1709853766, -4294967296.5, -21041749, 'obqj0uu5v')," + + "(-7, 80.88, 528792379, 'n5qr9m26i')," + + "(-456431629, 28.43, 1958788149, 'b')," + + "(-28841240, 11.86, -1089765168, 'pqg')," + + "(-807839288, 25.89, 504535500, 'cs3tkhs')," + + "(-52910064, 85.16, 354032882, '_ffjo67yxe')," + + "(1919869830, 81.81, -272247558, 'aj')," + + "(165434725, -2147483648.0, 11, 'xxnsf5')," + + "(3, -2147483648.7, 1616632952, 'g7t8tqyi')," + + "(1851859144, 70.73, -1105664209, 'qjfhjr');") + + tk.MustQuery("SELECT subq_0.c3 as c1 FROM (select c_a90ol as c3, c_a90ol as c4, var_pop(cast(c__qy as double)) over (partition by c_a90ol, c_s order by c_z) as c5 from t_jg8o limit 65) as subq_0 LIMIT 37") +} diff --git a/pkg/util/chunk/BUILD.bazel b/pkg/util/chunk/BUILD.bazel index 7f3cfca2a24a6..0d8a6e96c52bc 100644 --- a/pkg/util/chunk/BUILD.bazel +++ b/pkg/util/chunk/BUILD.bazel @@ -27,9 +27,11 @@ go_library( "//pkg/parser/terror", "//pkg/types", "//pkg/util/checksum", + "//pkg/util/disjointset", "//pkg/util/disk", "//pkg/util/encrypt", "//pkg/util/hack", + "//pkg/util/intest", "//pkg/util/logutil", "//pkg/util/memory", "//pkg/util/syncutil", diff --git a/pkg/util/chunk/chunk.go b/pkg/util/chunk/chunk.go index 03f2abd0b2ea1..b0070c36b5c6c 100644 --- a/pkg/util/chunk/chunk.go +++ b/pkg/util/chunk/chunk.go @@ -215,10 +215,12 @@ func (c *Chunk) MakeRefTo(dstColIdx int, src *Chunk, srcColIdx int) error { return nil } -// SwapColumn swaps Column "c.columns[colIdx]" with Column +// swapColumn swaps Column "c.columns[colIdx]" with Column // "other.columns[otherIdx]". If there exists columns refer to the Column to be // swapped, we need to re-build the reference. -func (c *Chunk) SwapColumn(colIdx int, other *Chunk, otherIdx int) error { +// this function should not be used directly, if you wants to swap columns between two chunks, +// use ColumnSwapHelper.SwapColumns instead. +func (c *Chunk) swapColumn(colIdx int, other *Chunk, otherIdx int) error { if c.sel != nil || other.sel != nil { return errors.New(msgErrSelNotNil) } diff --git a/pkg/util/chunk/chunk_test.go b/pkg/util/chunk/chunk_test.go index 690f831001597..18d6a0c64fffa 100644 --- a/pkg/util/chunk/chunk_test.go +++ b/pkg/util/chunk/chunk_test.go @@ -640,25 +640,25 @@ func TestSwapColumn(t *testing.T) { checkRef() // swap two chunk's columns - require.NoError(t, chk1.SwapColumn(0, chk2, 0)) + require.NoError(t, chk1.swapColumn(0, chk2, 0)) checkRef() - require.NoError(t, chk1.SwapColumn(0, chk2, 0)) + require.NoError(t, chk1.swapColumn(0, chk2, 0)) checkRef() // swap reference and referenced columns - require.NoError(t, chk2.SwapColumn(1, chk2, 0)) + require.NoError(t, chk2.swapColumn(1, chk2, 0)) checkRef() // swap the same column in the same chunk - require.NoError(t, chk2.SwapColumn(1, chk2, 1)) + require.NoError(t, chk2.swapColumn(1, chk2, 1)) checkRef() // swap reference and another column - require.NoError(t, chk2.SwapColumn(1, chk2, 2)) + require.NoError(t, chk2.swapColumn(1, chk2, 2)) checkRef() - require.NoError(t, chk2.SwapColumn(2, chk2, 0)) + require.NoError(t, chk2.swapColumn(2, chk2, 0)) checkRef() } diff --git a/pkg/util/chunk/chunk_util.go b/pkg/util/chunk/chunk_util.go index b0157f2b2bdc7..8761aab7e2c21 100644 --- a/pkg/util/chunk/chunk_util.go +++ b/pkg/util/chunk/chunk_util.go @@ -17,11 +17,14 @@ package chunk import ( "io" "os" + "sync/atomic" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/util/checksum" + "github.com/pingcap/tidb/pkg/util/disjointset" "github.com/pingcap/tidb/pkg/util/encrypt" + "github.com/pingcap/tidb/pkg/util/intest" ) // CopySelectedJoinRowsDirect directly copies the selected joined rows from the source Chunk @@ -237,3 +240,125 @@ func (l *diskFileReaderWriter) write(writeData []byte) (n int, err error) { l.offWrite += int64(writeNum) return writeNum, err } + +// ColumnSwapHelper is used to help swap columns in a chunk. +type ColumnSwapHelper struct { + // InputIdxToOutputIdxes maps the input column index to the output column indexes. + InputIdxToOutputIdxes map[int][]int + // mergedInputIdxToOutputIdxes is only determined in runtime when saw the input chunk. + mergedInputIdxToOutputIdxes atomic.Pointer[map[int][]int] +} + +// SwapColumns evaluates "Column" expressions. +// it will change the content of the input Chunk. +func (helper *ColumnSwapHelper) SwapColumns(input, output *Chunk) error { + // mergedInputIdxToOutputIdxes only can be determined in runtime when we saw the input chunk structure. + if helper.mergedInputIdxToOutputIdxes.Load() == nil { + helper.mergeInputIdxToOutputIdxes(input, helper.InputIdxToOutputIdxes) + } + for inputIdx, outputIdxes := range *helper.mergedInputIdxToOutputIdxes.Load() { + if err := output.swapColumn(outputIdxes[0], input, inputIdx); err != nil { + return err + } + for i, length := 1, len(outputIdxes); i < length; i++ { + output.MakeRef(outputIdxes[0], outputIdxes[i]) + } + } + return nil +} + +// mergeInputIdxToOutputIdxes merges separate inputIdxToOutputIdxes entries when column references +// are detected within the input chunk. This process ensures consistent handling of columns derived +// from the same original source. +// +// Consider the following scenario: +// +// Initial scan operation produces a column 'a': +// +// scan: a (addr: ???) +// +// This column 'a' is used in the first projection (proj1) to create two columns a1 and a2, both referencing 'a': +// +// proj1 +// / \ +// / \ +// / \ +// a1 (addr: 0xe) a2 (addr: 0xe) +// / \ +// / \ +// / \ +// proj2 proj2 +// / \ / \ +// / \ / \ +// a3 a4 a5 a6 +// +// (addr: 0xe) (addr: 0xe) (addr: 0xe) (addr: 0xe) +// +// Here, a1 and a2 share the same address (0xe), indicating they reference the same data from the original 'a'. +// +// When moving to the second projection (proj2), the system tries to project these columns further: +// - The first set (left side) consists of a3 and a4, derived from a1, both retaining the address (0xe). +// - The second set (right side) consists of a5 and a6, derived from a2, also starting with address (0xe). +// +// When proj1 is complete, the output chunk contains two columns [a1, a2], both derived from the single column 'a' from the scan. +// Since both a1 and a2 are column references with the same address (0xe), they are treated as referencing the same data. +// +// In proj2, two separate items are created: +// - <0, [0,1]>: This means the 0th input column (a1) is projected twice, into the 0th and 1st columns of the output chunk. +// - <1, [2,3]>: This means the 1st input column (a2) is projected twice, into the 2nd and 3rd columns of the output chunk. +// +// Due to the column swapping logic in each projection, after applying the <0, [0,1]> projection, +// the addresses for a1 and a2 may become swapped or invalid: +// +// proj1: a1 (addr: invalid) a2 (addr: invalid) +// +// This can lead to issues in proj2, where further operations on these columns may be unsafe: +// +// proj2: a3 (addr: 0xe) a4 (addr: 0xe) a5 (addr: ???) a6 (addr: ???) +// +// Therefore, it's crucial to identify and merge the original column references early, ensuring +// the final inputIdxToOutputIdxes mapping accurately reflects the shared origins of the data. +// For instance, <0, [0,1,2,3]> indicates that the 0th input column (original 'a') is referenced +// by all four output columns in the final output. +// +// mergeInputIdxToOutputIdxes merges inputIdxToOutputIdxes based on detected column references. +// This ensures that columns with the same reference are correctly handled in the output chunk. +func (helper *ColumnSwapHelper) mergeInputIdxToOutputIdxes(input *Chunk, inputIdxToOutputIdxes map[int][]int) { + originalDJSet := disjointset.NewSet[int](4) + flag := make([]bool, input.NumCols()) + // Detect self column-references inside the input chunk by comparing column addresses + for i := 0; i < input.NumCols(); i++ { + if flag[i] { + continue + } + for j := i + 1; j < input.NumCols(); j++ { + if input.Column(i) == input.Column(j) { + flag[j] = true + originalDJSet.Union(i, j) + } + } + } + // Merge inputIdxToOutputIdxes based on the detected column references. + newInputIdxToOutputIdxes := make(map[int][]int, len(inputIdxToOutputIdxes)) + for inputIdx := range inputIdxToOutputIdxes { + // Root idx is internal offset, not the right column index. + originalRootIdx := originalDJSet.FindRoot(inputIdx) + originalVal, ok := originalDJSet.FindVal(originalRootIdx) + intest.Assert(ok) + mergedOutputIdxes := newInputIdxToOutputIdxes[originalVal] + mergedOutputIdxes = append(mergedOutputIdxes, inputIdxToOutputIdxes[inputIdx]...) + newInputIdxToOutputIdxes[originalVal] = mergedOutputIdxes + } + // Update the merged inputIdxToOutputIdxes automatically. + // Once failed, it means other worker has done this job at meantime. + helper.mergedInputIdxToOutputIdxes.CompareAndSwap(nil, &newInputIdxToOutputIdxes) +} + +// NewColumnSwapHelper creates a new ColumnSwapHelper. +func NewColumnSwapHelper(usedColumnIndex []int) *ColumnSwapHelper { + helper := &ColumnSwapHelper{InputIdxToOutputIdxes: make(map[int][]int)} + for outputIndex, inputIndex := range usedColumnIndex { + helper.InputIdxToOutputIdxes[inputIndex] = append(helper.InputIdxToOutputIdxes[inputIndex], outputIndex) + } + return helper +} diff --git a/pkg/util/chunk/chunk_util_test.go b/pkg/util/chunk/chunk_util_test.go index 32614ada18703..453c9cdcf28c2 100644 --- a/pkg/util/chunk/chunk_util_test.go +++ b/pkg/util/chunk/chunk_util_test.go @@ -17,8 +17,10 @@ package chunk import ( "math/rand" "reflect" + "slices" "testing" + "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/types" "github.com/stretchr/testify/require" ) @@ -219,3 +221,41 @@ func BenchmarkAppendSelectedRow(b *testing.B) { } } } + +func TestMergeInputIdxToOutputIdxes(t *testing.T) { + inputIdxToOutputIdxes := make(map[int][]int) + // input 0th should be column referred as 0th and 1st in output columns. + inputIdxToOutputIdxes[0] = []int{0, 1} + // input 1th should be column referred as 2nd and 3rd in output columns. + inputIdxToOutputIdxes[1] = []int{2, 3} + columnEval := ColumnSwapHelper{InputIdxToOutputIdxes: inputIdxToOutputIdxes} + + input := NewEmptyChunk([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)}) + input.AppendInt64(0, 99) + // input chunk's 0th and 1st are column referred itself. + input.MakeRef(0, 1) + + // chunk: col1 <---(ref) col2 + // ____________/ \___________/ \___ + // proj: col1 col2 col3 col4 + // + // original case after inputIdxToOutputIdxes[0], the original col2 will be nil pointer + // cause consecutive col3,col4 ref projection are invalid. + // + // after fix, the new inputIdxToOutputIdxes should be: inputIdxToOutputIdxes[0]: {0, 1, 2, 3} + + output := NewEmptyChunk([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong), + types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)}) + + err := columnEval.SwapColumns(input, output) + require.NoError(t, err) + // all four columns are column-referred, pointing to the first one. + require.Equal(t, output.Column(0), output.Column(1)) + require.Equal(t, output.Column(1), output.Column(2)) + require.Equal(t, output.Column(2), output.Column(3)) + require.Equal(t, output.GetRow(0).GetInt64(0), int64(99)) + + require.Equal(t, len(*columnEval.mergedInputIdxToOutputIdxes.Load()), 1) + slices.Sort((*columnEval.mergedInputIdxToOutputIdxes.Load())[0]) + require.Equal(t, (*columnEval.mergedInputIdxToOutputIdxes.Load())[0], []int{0, 1, 2, 3}) +}