Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Support TopN Spill #51809

Merged
merged 289 commits into from
May 15, 2024
Merged
Changes from 1 commit
Commits
Show all changes
289 commits
Select commit Hold shift + click to select a range
65d4fdf
bug needs fix
xzhangxian1008 Dec 4, 2023
edc5985
add some tests
xzhangxian1008 Dec 5, 2023
312e1c0
pass error from spill
xzhangxian1008 Dec 5, 2023
a754672
tweaking
xzhangxian1008 Dec 5, 2023
aa12e76
refine
xzhangxian1008 Dec 5, 2023
a871782
fix ci
xzhangxian1008 Dec 6, 2023
bb71717
create file
xzhangxian1008 Dec 7, 2023
285b5fb
address comments
xzhangxian1008 Dec 7, 2023
936ee2a
update bazel
xzhangxian1008 Dec 7, 2023
50269fe
refactor
xzhangxian1008 Dec 7, 2023
656eec1
add todo
xzhangxian1008 Dec 7, 2023
be6b18d
save
xzhangxian1008 Dec 7, 2023
540db7d
tweaking
xzhangxian1008 Dec 8, 2023
403c54c
save
xzhangxian1008 Dec 8, 2023
ae18b03
save
xzhangxian1008 Dec 8, 2023
af1e6c3
add some tests
xzhangxian1008 Dec 11, 2023
4fedd88
save
xzhangxian1008 Dec 11, 2023
20ac1b3
update
xzhangxian1008 Dec 12, 2023
08eb30d
fix bugs
xzhangxian1008 Dec 13, 2023
4c7738f
merge
xzhangxian1008 Dec 13, 2023
7e5b4a2
tweaking
xzhangxian1008 Dec 13, 2023
e216c7f
add todo
xzhangxian1008 Dec 13, 2023
17866cf
delete originOnExceed
xzhangxian1008 Dec 13, 2023
1d3278c
tweaking
xzhangxian1008 Dec 14, 2023
10ae250
unconsume all
xzhangxian1008 Dec 14, 2023
58bd4f6
tweaking
xzhangxian1008 Dec 14, 2023
6246f55
uncomment
xzhangxian1008 Dec 14, 2023
103ba0d
merge
xzhangxian1008 Dec 14, 2023
63a13c9
tweaking
xzhangxian1008 Dec 14, 2023
8291566
fix mpmcqueue and add tests
xzhangxian1008 Dec 15, 2023
1e351e8
fix bugs
xzhangxian1008 Dec 15, 2023
33626e3
address comments
xzhangxian1008 Dec 15, 2023
90720f4
Merge branch 'master' of https://github.com/pingcap/tidb into refine-…
xzhangxian1008 Dec 29, 2023
d901a47
address comment
xzhangxian1008 Dec 29, 2023
349430d
fix ci
xzhangxian1008 Dec 29, 2023
25cb181
put some logic into sort partition
xzhangxian1008 Dec 29, 2023
6d0b1d0
add todo
xzhangxian1008 Dec 29, 2023
729d7af
address comment
xzhangxian1008 Jan 3, 2024
a8c81b4
tweaking
xzhangxian1008 Jan 3, 2024
389f545
remove savedChunks
xzhangxian1008 Jan 3, 2024
85d03d8
remove useless codes
xzhangxian1008 Jan 3, 2024
3681dec
fix bug
xzhangxian1008 Jan 3, 2024
77eae04
remove useless field
xzhangxian1008 Jan 3, 2024
7ccad75
remove spillHelper
xzhangxian1008 Jan 3, 2024
c898a59
add tests
xzhangxian1008 Jan 4, 2024
42e6c4c
tweaking
xzhangxian1008 Jan 4, 2024
e1a590f
address comment
xzhangxian1008 Jan 4, 2024
5b740a2
tweaking
xzhangxian1008 Jan 4, 2024
5991094
fix analyze
xzhangxian1008 Jan 4, 2024
7a97bd2
tweaking
xzhangxian1008 Jan 4, 2024
cfdc9d3
fix ut
xzhangxian1008 Jan 4, 2024
caed9ca
fix ci
xzhangxian1008 Jan 4, 2024
380020b
tweaking
xzhangxian1008 Jan 4, 2024
5293652
tweaking
xzhangxian1008 Jan 4, 2024
6c4d00b
change to chunk iter
xzhangxian1008 Jan 4, 2024
2458060
add comment
xzhangxian1008 Jan 4, 2024
2498c89
aa
xzhangxian1008 Jan 4, 2024
14d9445
address comment
xzhangxian1008 Jan 4, 2024
9cf5935
a
xzhangxian1008 Jan 5, 2024
baab197
tweaking
xzhangxian1008 Jan 5, 2024
13e6079
fix ut
xzhangxian1008 Jan 5, 2024
915c4d0
remove useless function
xzhangxian1008 Jan 5, 2024
bccf964
update bazel
xzhangxian1008 Jan 5, 2024
9b5b872
fix ut
xzhangxian1008 Jan 5, 2024
58ea1c9
a
xzhangxian1008 Jan 5, 2024
fa8f62b
merge master
xzhangxian1008 Jan 5, 2024
dad50fb
merge sort refine
xzhangxian1008 Jan 5, 2024
3ee06b5
refine ut
xzhangxian1008 Jan 5, 2024
0e44add
replace with faster sort function
xzhangxian1008 Jan 8, 2024
3c0830c
refine
xzhangxian1008 Jan 8, 2024
d768973
merge
xzhangxian1008 Jan 9, 2024
8eb2791
switch code position
xzhangxian1008 Jan 9, 2024
1f2dbff
tweaking
xzhangxian1008 Jan 9, 2024
e48899d
update bazel
xzhangxian1008 Jan 9, 2024
f076e5b
save
xzhangxian1008 Jan 9, 2024
e62771d
move sortedRowsList into SortExec
xzhangxian1008 Jan 9, 2024
fcb3d50
Merge branch 'parallel-sort' into parallel-sort-spill
xzhangxian1008 Jan 9, 2024
336a9ba
save
xzhangxian1008 Jan 10, 2024
f6e85a4
remove local queue
xzhangxian1008 Jan 10, 2024
28b4abc
merge
xzhangxian1008 Jan 10, 2024
e61e7fd
refine
xzhangxian1008 Jan 10, 2024
a76d35a
refine
xzhangxian1008 Jan 10, 2024
32171fa
merge
xzhangxian1008 Jan 10, 2024
b55c0f2
replace mpmcqueue with channel
xzhangxian1008 Jan 10, 2024
53afd3f
update bazel
xzhangxian1008 Jan 10, 2024
14df456
merge
xzhangxian1008 Jan 10, 2024
35e5984
fix ci
xzhangxian1008 Jan 11, 2024
22e17e7
add failpoint
xzhangxian1008 Jan 11, 2024
c729a68
add comment and complete todo
xzhangxian1008 Jan 11, 2024
c643fef
fix ci
xzhangxian1008 Jan 11, 2024
21977f8
fix comment
xzhangxian1008 Jan 11, 2024
788c450
merge
xzhangxian1008 Jan 12, 2024
186ed7d
save
xzhangxian1008 Jan 12, 2024
1ce71f4
address comment
xzhangxian1008 Jan 16, 2024
34083c8
tweaking
xzhangxian1008 Jan 16, 2024
cda09d3
tweaking
xzhangxian1008 Jan 16, 2024
c5f6a4f
tweaking
xzhangxian1008 Jan 16, 2024
f8ce53b
merge
xzhangxian1008 Jan 16, 2024
2125940
refine
xzhangxian1008 Jan 16, 2024
d9e6491
merge branch parallel_sort
xzhangxian1008 Jan 16, 2024
14a801b
update
xzhangxian1008 Jan 16, 2024
97147d5
add finishCh
xzhangxian1008 Jan 16, 2024
fc97856
address comment
xzhangxian1008 Jan 17, 2024
8527e90
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Jan 17, 2024
eff836e
merge
xzhangxian1008 Jan 17, 2024
96a62ca
fix bug
xzhangxian1008 Jan 17, 2024
195133f
codes done, need tests
xzhangxian1008 Jan 17, 2024
e3e53fd
remove useless codes and udpate bazel
xzhangxian1008 Jan 17, 2024
ac05f1e
address some comments
xzhangxian1008 Jan 17, 2024
357718f
tweaking
xzhangxian1008 Jan 17, 2024
6058b30
address comment
xzhangxian1008 Jan 18, 2024
7580262
merge parallel-sort branch
xzhangxian1008 Jan 18, 2024
8acd6c0
ready to test
xzhangxian1008 Jan 18, 2024
34eff5e
tweaking
xzhangxian1008 Jan 18, 2024
c67b9c6
save
xzhangxian1008 Jan 18, 2024
fc4b727
refine close
xzhangxian1008 Jan 18, 2024
d894a3a
tweaking
xzhangxian1008 Jan 18, 2024
102d969
merge parallel-sort branch
xzhangxian1008 Jan 18, 2024
ad98c71
fix bug
xzhangxian1008 Jan 19, 2024
afd9a8c
refactor parallel sort
xzhangxian1008 Jan 21, 2024
f6fb1b0
remove useless codes
xzhangxian1008 Jan 21, 2024
5d10251
tweaking
xzhangxian1008 Jan 22, 2024
ef696ff
remove useless comment
xzhangxian1008 Jan 22, 2024
229a4b8
remove todo
xzhangxian1008 Jan 22, 2024
8c258ab
move initKWayMerge
xzhangxian1008 Jan 22, 2024
a369c56
tweaking
xzhangxian1008 Jan 23, 2024
c07f634
fix ci
xzhangxian1008 Jan 23, 2024
65c9a6f
merge
xzhangxian1008 Jan 23, 2024
3b0755a
fix ci
xzhangxian1008 Jan 24, 2024
757f88d
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Jan 24, 2024
4a0c6c8
merge
xzhangxian1008 Jan 24, 2024
307ec06
save
xzhangxian1008 Jan 24, 2024
5b8d58b
save
xzhangxian1008 Jan 24, 2024
44d13fe
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Jan 24, 2024
2408efe
tweaking
xzhangxian1008 Jan 24, 2024
3a1d18b
refine
xzhangxian1008 Jan 24, 2024
46e9e92
refactor and not done
xzhangxian1008 Jan 26, 2024
81e48e1
remove useless codes
xzhangxian1008 Jan 26, 2024
c5538e1
update bazel
xzhangxian1008 Jan 29, 2024
143c7cb
update introduction
xzhangxian1008 Jan 29, 2024
4c036e7
merge master
xzhangxian1008 Jan 29, 2024
bce3684
merge
xzhangxian1008 Jan 31, 2024
7ca7908
fix
xzhangxian1008 Jan 31, 2024
d76d5e2
tweaking
xzhangxian1008 Jan 31, 2024
9ef960a
codes done
xzhangxian1008 Jan 31, 2024
59f39ba
tweaking
xzhangxian1008 Jan 31, 2024
fb85d8b
fixc
xzhangxian1008 Jan 31, 2024
229a53a
fix bugs
xzhangxian1008 Feb 1, 2024
28831d3
tweaking
xzhangxian1008 Feb 1, 2024
741f2e7
set finished
xzhangxian1008 Feb 2, 2024
756cf86
fix bugs and add tests
xzhangxian1008 Feb 2, 2024
9160f7e
tweaking
xzhangxian1008 Feb 4, 2024
59b5e46
tweaking
xzhangxian1008 Feb 4, 2024
b1d9faf
address comment
xzhangxian1008 Feb 4, 2024
8d849c3
merge
xzhangxian1008 Feb 4, 2024
5c9acfc
merge
xzhangxian1008 Feb 4, 2024
45f1add
tweaking
xzhangxian1008 Feb 4, 2024
2ffbff3
ready to add random failpoint tests
xzhangxian1008 Feb 6, 2024
2923d6f
pull remote master
xzhangxian1008 Feb 6, 2024
ed3f9b2
fix bugs and refine tests
xzhangxian1008 Feb 6, 2024
df58e9a
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Feb 6, 2024
83a4e61
refine test
xzhangxian1008 Feb 6, 2024
cb7121e
merge
xzhangxian1008 Feb 6, 2024
840caa3
add random failpoint tests
xzhangxian1008 Feb 6, 2024
0afe75e
refine
xzhangxian1008 Feb 23, 2024
731215b
Merge branch 'master' of https://github.com/pingcap/tidb into topn-spill
xzhangxian1008 Feb 23, 2024
325a141
merge
xzhangxian1008 Feb 27, 2024
c72cd32
uncomment tests
xzhangxian1008 Feb 27, 2024
817fd04
tweaking
xzhangxian1008 Feb 27, 2024
da987bb
merge
xzhangxian1008 Feb 28, 2024
9747e8b
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Feb 28, 2024
507f14c
refine topn
xzhangxian1008 Feb 28, 2024
4519bcf
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Feb 28, 2024
037da8f
Merge branch 'master' of https://github.com/pingcap/tidb into topn-spill
xzhangxian1008 Feb 28, 2024
33802d6
Merge branch 'parallel-sort-spill' into topn-spill
xzhangxian1008 Feb 28, 2024
475ebb9
add fallback
xzhangxian1008 Feb 28, 2024
c57728f
Merge branch 'parallel-sort-spill' into topn-spill
xzhangxian1008 Feb 29, 2024
c6bfbac
save
xzhangxian1008 Feb 29, 2024
88bf1d1
save
xzhangxian1008 Mar 1, 2024
96631c6
save
xzhangxian1008 Mar 6, 2024
fc6b90b
fix diskTracker's incorrect type
xzhangxian1008 Mar 7, 2024
5c68f75
save
xzhangxian1008 Mar 7, 2024
1006752
tweaking
xzhangxian1008 Mar 7, 2024
d5271d5
Merge branch 'master' of https://github.com/pingcap/tidb into topn-spill
xzhangxian1008 Mar 8, 2024
234da14
save
xzhangxian1008 Mar 8, 2024
09605dc
tweaking
xzhangxian1008 Mar 8, 2024
1fb9111
Merge branch 'parallel-sort-spill' into topn-spill
xzhangxian1008 Mar 8, 2024
85c827a
save
xzhangxian1008 Mar 11, 2024
3393c2d
tweaking
xzhangxian1008 Mar 11, 2024
cc7b2d7
tweaking
xzhangxian1008 Mar 11, 2024
7b02433
save
xzhangxian1008 Mar 11, 2024
894cb60
refine
xzhangxian1008 Mar 11, 2024
5f25004
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Mar 11, 2024
3a0a0df
Merge branch 'parallel-sort-spill' into topn-spill
xzhangxian1008 Mar 11, 2024
a66000d
rename
xzhangxian1008 Mar 11, 2024
80dd632
save
xzhangxian1008 Mar 11, 2024
fef756a
address comments
xzhangxian1008 Mar 12, 2024
6e148ff
wait for test
xzhangxian1008 Mar 13, 2024
08cd35a
prepare for tests
xzhangxian1008 Mar 13, 2024
77d5db2
tweaking
xzhangxian1008 Mar 13, 2024
f8c8528
add some tests
xzhangxian1008 Mar 14, 2024
f12384c
save
xzhangxian1008 Mar 14, 2024
fa615cf
Merge branch 'master' of https://github.com/pingcap/tidb into topn-spill
xzhangxian1008 Mar 15, 2024
4eb248a
save
xzhangxian1008 Mar 15, 2024
4613f80
fix bugs
xzhangxian1008 Mar 15, 2024
e8460aa
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Mar 18, 2024
41c50c7
tweaking
xzhangxian1008 Mar 18, 2024
a59916d
tweaking
xzhangxian1008 Mar 18, 2024
91c026f
add tests, need failpoint tests
xzhangxian1008 Mar 18, 2024
391c4a8
tweaking
xzhangxian1008 Mar 18, 2024
a59454c
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Mar 18, 2024
1851c41
merge
xzhangxian1008 Mar 18, 2024
9418fe4
add failpoint
xzhangxian1008 Mar 18, 2024
520f30a
tweaking
xzhangxian1008 Mar 18, 2024
d1c8fae
fix
xzhangxian1008 Mar 19, 2024
fcfb72a
tweaking
xzhangxian1008 Mar 19, 2024
62f54c6
tweaking
xzhangxian1008 Mar 19, 2024
0bdf99a
fix init
xzhangxian1008 Mar 19, 2024
8d55139
fix bug
xzhangxian1008 Mar 19, 2024
50e08c8
fix bugs
xzhangxian1008 Mar 20, 2024
5b91f31
refine test
xzhangxian1008 Mar 20, 2024
2cc3a62
fix bug
xzhangxian1008 Mar 21, 2024
5b44802
save
xzhangxian1008 Mar 21, 2024
892f158
add implement
xzhangxian1008 Mar 22, 2024
581c1e2
add implementation
xzhangxian1008 Mar 22, 2024
d159e28
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Mar 26, 2024
72e38d6
address comments
xzhangxian1008 Mar 26, 2024
4dcedf1
merge
xzhangxian1008 Mar 26, 2024
f75d8ba
fix bugs
xzhangxian1008 Mar 26, 2024
249381f
tweaking
xzhangxian1008 Mar 26, 2024
c776066
update bazel
xzhangxian1008 Mar 26, 2024
756bd45
fix bugs
xzhangxian1008 Mar 26, 2024
f196b09
merge
xzhangxian1008 Mar 28, 2024
9f9adac
address comments
xzhangxian1008 Apr 16, 2024
466cd71
add picture
xzhangxian1008 Apr 17, 2024
e9dee8a
address comment
xzhangxian1008 Apr 19, 2024
1da7bf1
Merge branch 'master' of ssh://github.com/pingcap/tidb into topn-spill
xzhangxian1008 Apr 22, 2024
caef88b
tweaking
xzhangxian1008 Apr 22, 2024
d9e091a
Merge branch 'master' of ssh://github.com/pingcap/tidb into topn-spill
xzhangxian1008 Apr 26, 2024
b2d6122
address comment
xzhangxian1008 Apr 26, 2024
e792877
Merge branch 'master' of ssh://github.com/pingcap/tidb into topn-spill
xzhangxian1008 Apr 28, 2024
022f7e9
add tests
xzhangxian1008 Apr 28, 2024
b9103a0
remove useless codes
xzhangxian1008 Apr 30, 2024
1acfca2
fix ci
xzhangxian1008 May 10, 2024
a299645
fix ci
xzhangxian1008 May 11, 2024
2880c9c
Merge branch 'master' of ssh://github.com/pingcap/tidb into topn-spill
xzhangxian1008 May 14, 2024
9e0a1d2
fix ci
xzhangxian1008 May 14, 2024
621df22
fix
xzhangxian1008 May 14, 2024
988f869
Merge branch 'master' of ssh://github.com/pingcap/tidb into topn-spill
xzhangxian1008 May 15, 2024
7ac8f76
fix ci
xzhangxian1008 May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add some tests
xzhangxian1008 committed Mar 14, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit f8c8528a19574c38e16aa029dc297d7804fced4a
11 changes: 9 additions & 2 deletions pkg/executor/sortexec/sort_spill_test.go
Original file line number Diff line number Diff line change
@@ -16,11 +16,13 @@ package sortexec_test

import (
"context"
"fmt"
"sort"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/internal/testutil"
"github.com/pingcap/tidb/pkg/executor/sortexec"
@@ -98,10 +100,13 @@ func (r *resultChecker) initRowPtrs() {
}
}

func (r *resultChecker) check(resultChunks []*chunk.Chunk) bool {
func (r *resultChecker) check(resultChunks []*chunk.Chunk, isTopN bool, offset uint64, count uint64) bool {
if r.rowPtrs == nil {
r.initRowPtrs()
sort.Slice(r.rowPtrs, r.keyColumnsLess)
if isTopN {
r.rowPtrs = r.rowPtrs[offset : offset+count]
}
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
}

cursor := 0
@@ -128,6 +133,8 @@ func (r *resultChecker) check(resultChunks []*chunk.Chunk) bool {
expectRow := r.savedChunks[r.rowPtrs[cursor].ChkIdx].GetRow(int(r.rowPtrs[cursor].RowIdx))
expect := expectRow.ToString(fieldTypes)

log.Info(fmt.Sprintf("res: %s, expect: %s", res, expect))

if res != expect {
return false
}
@@ -212,7 +219,7 @@ func executeSortExecutorAndManullyTriggerSpill(t *testing.T, exe *sortexec.SortE
func checkCorrectness(schema *expression.Schema, exe *sortexec.SortExec, dataSource *testutil.MockDataSource, resultChunks []*chunk.Chunk) bool {
keyColumns, keyCmpFuncs, byItemsDesc := exe.GetSortMetaForTest()
checker := newResultChecker(schema, keyColumns, keyCmpFuncs, byItemsDesc, dataSource.GenData)
return checker.check(resultChunks)
return checker.check(resultChunks, false, 0, 0)
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
}

func onePartitionAndAllDataInMemoryCase(t *testing.T, ctx *mock.Context, sortCase *testutil.SortCase) {
35 changes: 21 additions & 14 deletions pkg/executor/sortexec/topn.go
Original file line number Diff line number Diff line change
@@ -73,8 +73,8 @@ func (e *TopNExec) Open(ctx context.Context) error {
workers := make([]*topNWorker, concurrency)
for i := range workers {
chkHeap := &topNChunkHeap{}
chkHeap.init(e, e.Limit.Offset+e.Limit.Count, int(e.Limit.Offset), e.greaterRow)
workers[i] = newTopNWorker(e.fetcherAndWorkerSyncer, e.resultChannel, e.finishCh, e, chkHeap)
chkHeap.init(e, e.memTracker, e.Limit.Offset+e.Limit.Count, int(e.Limit.Offset), e.greaterRow)
workers[i] = newTopNWorker(e.fetcherAndWorkerSyncer, e.resultChannel, e.finishCh, e, chkHeap, e.memTracker)
}

e.spillHelper = newTopNSpillerHelper(
@@ -97,7 +97,7 @@ func (e *TopNExec) Open(ctx context.Context) error {
func (e *TopNExec) Close() error {
close(e.finishCh)
if e.fetched.CompareAndSwap(false, true) {
close(e.Parallel.resultChannel)
close(e.resultChannel)
return nil
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
}

@@ -110,7 +110,12 @@ func (e *TopNExec) Close() error {
e.chkHeap = nil
e.spillHelper = nil
e.spillAction = nil
return nil

if e.memTracker != nil {
e.memTracker.ReplaceBytesUsed(0)
}

return exec.Close(e.Children(0))
}

func (e *TopNExec) greaterRow(rowI, rowJ chunk.Row) bool {
@@ -165,7 +170,7 @@ func (e *TopNExec) fetchChunks(ctx context.Context) error {
func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error {
e.initCompareFuncs()
e.buildKeyColumns()
e.chkHeap.init(e, e.Limit.Offset+e.Limit.Count, int(e.Limit.Offset), e.greaterRow)
e.chkHeap.init(e, e.memTracker, e.Limit.Offset+e.Limit.Count, int(e.Limit.Offset), e.greaterRow)
for uint64(e.chkHeap.rowChunks.Len()) < e.chkHeap.totalLimit {
srcChk := exec.TryNewCacheChunk(e.Children(0))
// adjust required rows by total limit
@@ -324,7 +329,13 @@ func (e *TopNExec) executeTopNWithSpill(ctx context.Context) error {
}

func (e *TopNExec) executeTopN(ctx context.Context) {
defer close(e.resultChannel)
defer func() {
if r := recover(); r != nil {
processPanicAndLog(e.resultChannel, r)
}

close(e.resultChannel)
}()

heap.Init(e.chkHeap)
for uint64(len(e.chkHeap.rowPtrs)) > e.chkHeap.totalLimit {
@@ -398,14 +409,6 @@ func (e *TopNExec) generateTopNResultsWithSpill() error {
}

func (e *TopNExec) generateTopNResults() {
defer func() {
if r := recover(); r != nil {
processPanicAndLog(e.resultChannel, r)
}

close(e.resultChannel)
}()

if !e.spillHelper.isSpillTriggered() {
if !e.generateTopNResultsWithNoSpill() {
return
@@ -420,6 +423,10 @@ func (e *TopNExec) generateTopNResults() {
e.generateTopNResultsWithSpill()
}

func (e *TopNExec) IsSpillTriggeredForTest() bool {
return e.spillHelper.isSpillTriggered()
}

func injectTopNRandomFail(triggerFactor int32) {
failpoint.Inject("TopNRandomFail", func(val failpoint.Value) {
if val.(bool) {
4 changes: 3 additions & 1 deletion pkg/executor/sortexec/topn_chunk_heap.go
Original file line number Diff line number Diff line change
@@ -41,7 +41,9 @@ type topNChunkHeap struct {
idx int
}

func (h *topNChunkHeap) init(topnExec *TopNExec, totalLimit uint64, idx int, greaterRow func(chunk.Row, chunk.Row) bool) {
func (h *topNChunkHeap) init(topnExec *TopNExec, memTracker *memory.Tracker, totalLimit uint64, idx int, greaterRow func(chunk.Row, chunk.Row) bool) {
h.memTracker = memTracker

h.rowChunks = chunk.NewList(exec.RetTypes(topnExec), topnExec.InitCap(), topnExec.MaxChunkSize())
h.rowChunks.GetMemTracker().AttachTo(h.memTracker)
h.rowChunks.GetMemTracker().SetLabel(memory.LabelForRowChunks)
107 changes: 90 additions & 17 deletions pkg/executor/sortexec/topn_spill_test.go
Original file line number Diff line number Diff line change
@@ -15,60 +15,133 @@
package sortexec_test

import (
"context"
"testing"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/internal/testutil"
"github.com/pingcap/tidb/pkg/executor/sortexec"
"github.com/pingcap/tidb/pkg/expression"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
)

func checkTopNCorrectness(schema *expression.Schema, exe *sortexec.TopNExec, dataSource *testutil.MockDataSource, resultChunks []*chunk.Chunk, offset uint64, count uint64) bool {
keyColumns, keyCmpFuncs, byItemsDesc := exe.GetSortMetaForTest()
checker := newResultChecker(schema, keyColumns, keyCmpFuncs, byItemsDesc, dataSource.GenData)
return checker.check(resultChunks, true, offset, count)
}

func buildTopNExec(sortCase *testutil.SortCase, dataSource *testutil.MockDataSource, offset uint64, count uint64) *sortexec.TopNExec {
dataSource.PrepareChunks()
sortExec := sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(sortCase.Ctx, dataSource.Schema(), 0, dataSource),
ByItems: make([]*plannerutil.ByItems, 0, len(sortCase.OrderByIdx)),
ExecSchema: dataSource.Schema(),
}

for _, idx := range sortCase.OrderByIdx {
sortExec.ByItems = append(sortExec.ByItems, &plannerutil.ByItems{Expr: sortCase.Columns()[idx]})
}

topNexec := &sortexec.TopNExec{
SortExec: sortExec,
Limit: &plannercore.PhysicalLimit{Offset: offset, Count: count},
}

return topNexec
}

func executeTopNExecutor(t *testing.T, exe *sortexec.TopNExec) []*chunk.Chunk {
tmpCtx := context.Background()
err := exe.Open(tmpCtx)
require.NoError(t, err)

resultChunks := make([]*chunk.Chunk, 0)
chk := exec.NewFirstChunk(exe)
for {
err = exe.Next(tmpCtx, chk)
require.NoError(t, err)
if chk.NumRows() == 0 {
break
}
resultChunks = append(resultChunks, chk.CopyConstruct())
}
return resultChunks
}

// No spill will be triggered in this test
func topNNoSpillTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {
func topNNoSpillCase(t *testing.T, exe *sortexec.TopNExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource, offset uint64, count uint64) {
if exe == nil {
exe = buildTopNExec(sortCase, dataSource, offset, count)
}
dataSource.PrepareChunks()
resultChunks := executeTopNExecutor(t, exe)

require.False(t, exe.IsSpillTriggeredForTest())

err := exe.Close()
require.NoError(t, err)

require.True(t, checkTopNCorrectness(schema, exe, dataSource, resultChunks, offset, count))
}

// Topn executor has two stage:
// 1. Building heap, in this stage all received rows will be inserted into heap.
// 2. Updating heap, in this stage only rows that is smaller than the heap top could be inserted and we will drop the heap top.
//
// Case1 means that we will trigger spill in stage 1
func topNSpillCase1Test(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {
func topNSpillCase1(t *testing.T, ctx *mock.Context, exe *sortexec.TopNExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {

}

// Case2 means that we will trigger spill in stage 2
func topNSpillCase2Test(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {
func topNSpillCase2(t *testing.T, ctx *mock.Context, exe *sortexec.TopNExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {

}

// After sorted all rows are in memory and the spill is triggered after some chunks have been fetched
func topNInMemoryThenSpillCase(t *testing.T, ctx *mock.Context, exe *sortexec.TopNExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {

}

// TODO test offset not 0
func TestTopNSpillDisk(t *testing.T) {
totalRowNum := 100
sortexec.SetSmallSpillChunkSizeForTest()
ctx := mock.NewContext()
sortCase := &testutil.SortCase{Rows: 10000, OrderByIdx: []int{0, 1}, Ndvs: []int{0, 0}, Ctx: ctx}

// TODO add slow random fail point for topn failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/SlowSomeWorkers", `return(true)`)
topNCase := &testutil.SortCase{Rows: totalRowNum, OrderByIdx: []int{0, 1}, Ndvs: []int{0, 0}, Ctx: ctx}

ctx.GetSessionVars().InitChunkSize = 32
ctx.GetSessionVars().MaxChunkSize = 32
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit1)
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit2)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)

schema := expression.NewSchema(sortCase.Columns()...)
dataSource := buildDataSource(ctx, sortCase, schema)
exe := buildSortExec(ctx, sortCase, dataSource)
for i := 0; i < 10; i++ {
oneSpillCase(t, ctx, nil, sortCase, schema, dataSource)
oneSpillCase(t, ctx, exe, sortCase, schema, dataSource)
offset := uint64(totalRowNum / 10)
count := uint64(totalRowNum / 3)

schema := expression.NewSchema(topNCase.Columns()...)
dataSource := buildDataSource(ctx, topNCase, schema)
exe := buildTopNExec(topNCase, dataSource, offset, count)
for i := 0; i < 1; i++ {
log.Info("xzxdebug 1---------")
topNNoSpillCase(t, nil, topNCase, schema, dataSource, 0, count)
log.Info("xzxdebug 2---------")
topNNoSpillCase(t, exe, topNCase, schema, dataSource, 12, count)
}

ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit2)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
for i := 0; i < 10; i++ {
// TODO add slow random fail point for topn failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/SlowSomeWorkers", `return(true)`)
// ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit2)
// ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
// for i := 0; i < 10; i++ {

}
// }
}

func TestTopNSpillDiskFailpoint(t *testing.T) {
14 changes: 9 additions & 5 deletions pkg/executor/sortexec/topn_worker.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ import (
"sync"

"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/memory"
)

// topNWorker is used only when topn spill is triggered
@@ -27,22 +28,25 @@ type topNWorker struct {
errOutputChan chan<- rowWithError
finishChan <-chan struct{}

topn *TopNExec
chkHeap *topNChunkHeap
topn *TopNExec
chkHeap *topNChunkHeap
memTracker *memory.Tracker
}

func newTopNWorker(
fetcherAndWorkerSyncer *sync.WaitGroup,
errOutputChan chan<- rowWithError,
finishChan <-chan struct{},
topn *TopNExec,
chkHeap *topNChunkHeap) *topNWorker {
chkHeap *topNChunkHeap,
memTracker *memory.Tracker) *topNWorker {
return &topNWorker{
fetcherAndWorkerSyncer: fetcherAndWorkerSyncer,
errOutputChan: errOutputChan,
finishChan: finishChan,
chkHeap: chkHeap,
topn: topn,
memTracker: memTracker,
}
}

@@ -51,7 +55,7 @@ func (t *topNWorker) setChunkChannel(chunkChannel <-chan *chunk.Chunk) {
}

func (t *topNWorker) fetchChunksAndProcess() {
t.chkHeap.init(t.topn, t.topn.Limit.Offset+t.topn.Limit.Count, int(t.topn.Limit.Offset), t.topn.greaterRow)
t.chkHeap.init(t.topn, t.memTracker, t.topn.Limit.Offset+t.topn.Limit.Count, int(t.topn.Limit.Offset), t.topn.greaterRow)
for t.fetchChunksAndProcessImpl() {
}
}
@@ -68,7 +72,7 @@ func (t *topNWorker) fetchChunksAndProcessImpl() bool {

if uint64(t.chkHeap.rowChunks.Len()) < t.chkHeap.totalLimit {
if !t.chkHeap.isInitialized {
t.chkHeap.init(t.topn, t.topn.Limit.Offset+t.topn.Limit.Count, int(t.topn.Limit.Offset), t.topn.greaterRow)
t.chkHeap.init(t.topn, t.memTracker, t.topn.Limit.Offset+t.topn.Limit.Count, int(t.topn.Limit.Offset), t.topn.greaterRow)
}
t.chkHeap.rowChunks.Add(chk)
} else {