diff --git a/Makefile b/Makefile index f0812409c8a07..ac43ab272dbb2 100644 --- a/Makefile +++ b/Makefile @@ -312,7 +312,7 @@ tools/bin/revive: .PHONY: tools/bin/failpoint-ctl tools/bin/failpoint-ctl: - GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/pingcap/failpoint/failpoint-ctl@2eaa328 + GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/pingcap/failpoint/failpoint-ctl@9b3b6e3 .PHONY: tools/bin/errdoc-gen tools/bin/errdoc-gen: diff --git a/pkg/executor/index_lookup_hash_join.go b/pkg/executor/index_lookup_hash_join.go index 8dba9d3673215..06bb650d81ed3 100644 --- a/pkg/executor/index_lookup_hash_join.go +++ b/pkg/executor/index_lookup_hash_join.go @@ -563,6 +563,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde select { case joinResult.chk, ok = <-iw.joinChkResourceCh: case <-ctx.Done(): + joinResult.err = ctx.Err() return joinResult, false } return joinResult, ok @@ -783,7 +784,10 @@ func (iw *indexHashJoinInnerWorker) joinMatchedInnerRow2Chunk(ctx context.Contex select { case iw.resultCh <- joinResult: case <-ctx.Done(): + joinResult.err = ctx.Err() + return false, joinResult } + failpoint.InjectCall("joinMatchedInnerRow2Chunk") joinResult, ok = iw.getNewJoinResult(ctx) if !ok { return false, joinResult diff --git a/pkg/executor/index_lookup_join_test.go b/pkg/executor/index_lookup_join_test.go index 4715fa0eb23e0..c894d802d7163 100644 --- a/pkg/executor/index_lookup_join_test.go +++ b/pkg/executor/index_lookup_join_test.go @@ -18,10 +18,12 @@ import ( "context" "fmt" "math/rand" + "runtime" "strings" "testing" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" ) @@ -133,3 +135,41 @@ func TestIssue45716(t *testing.T) { err := tk.QueryToErr("select /*+ inl_join(t2) */ * from t1 join t2 on t1.a = t2.a;") tk.MustContainErrMsg(err.Error(), "test inlNewInnerPanic") } + +func TestIssue54688(t *testing.T) { + val := runtime.GOMAXPROCS(1) + defer func() { + runtime.GOMAXPROCS(val) + }() + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t, s;") + tk.MustExec("create table t(a int, index(a));") + tk.MustExec("create table s(a int, index(a));") + tk.MustExec("insert into t values(1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12), (13), (14), (15), (16);") + tk.MustExec("insert into s values(1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12), (13), (14), (15), (16);") + tk.MustExec("insert into s select * from s") + tk.MustExec("insert into s select * from s") + tk.MustExec("insert into s select * from s") + tk.MustExec("insert into s select * from s") + tk.MustExec("insert into s select * from s") + tk.MustExec("insert into s select * from s") + tk.MustExec("insert into s select * from s") + tk.MustExec("insert into s select * from s") + tk.MustExec("set @@tidb_index_lookup_join_concurrency=1;") + tk.MustExec("set @@tidb_index_join_batch_size=1000000;") + + for i := 0; i <= 100; i++ { + rs, err := tk.Exec("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a") + require.NoError(t, err) + context, cancel := context.WithCancel(context.Background()) + require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/pkg/executor/joinMatchedInnerRow2Chunk", + func() { + cancel() + }, + )) + _, _ = session.GetRows4Test(context, nil, rs) + rs.Close() + } +}