Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#54855
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
wshwsh12 authored and ti-chi-bot committed Jul 29, 2024
1 parent 42b624c commit 57da927
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde
select {
case joinResult.chk, ok = <-iw.joinChkResourceCh:
case <-ctx.Done():
joinResult.err = ctx.Err()
return joinResult, false
}
return joinResult, ok
Expand Down Expand Up @@ -783,7 +784,10 @@ func (iw *indexHashJoinInnerWorker) joinMatchedInnerRow2Chunk(ctx context.Contex
select {
case iw.resultCh <- joinResult:
case <-ctx.Done():
joinResult.err = ctx.Err()
return false, joinResult
}
failpoint.InjectCall("joinMatchedInnerRow2Chunk")
joinResult, ok = iw.getNewJoinResult(ctx)
if !ok {
return false, joinResult
Expand Down
40 changes: 40 additions & 0 deletions pkg/executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"context"
"fmt"
"math/rand"
"runtime"
"strings"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -133,3 +135,41 @@ func TestIssue45716(t *testing.T) {
err := tk.QueryToErr("select /*+ inl_join(t2) */ * from t1 join t2 on t1.a = t2.a;")
tk.MustContainErrMsg(err.Error(), "test inlNewInnerPanic")
}

func TestIssue54688(t *testing.T) {
val := runtime.GOMAXPROCS(1)
defer func() {
runtime.GOMAXPROCS(val)
}()
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t, s;")
tk.MustExec("create table t(a int, index(a));")
tk.MustExec("create table s(a int, index(a));")
tk.MustExec("insert into t values(1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12), (13), (14), (15), (16);")
tk.MustExec("insert into s values(1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12), (13), (14), (15), (16);")
tk.MustExec("insert into s select * from s")
tk.MustExec("insert into s select * from s")
tk.MustExec("insert into s select * from s")
tk.MustExec("insert into s select * from s")
tk.MustExec("insert into s select * from s")
tk.MustExec("insert into s select * from s")
tk.MustExec("insert into s select * from s")
tk.MustExec("insert into s select * from s")
tk.MustExec("set @@tidb_index_lookup_join_concurrency=1;")
tk.MustExec("set @@tidb_index_join_batch_size=1000000;")

for i := 0; i <= 100; i++ {
rs, err := tk.Exec("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a")
require.NoError(t, err)
context, cancel := context.WithCancel(context.Background())
require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/pkg/executor/join/joinMatchedInnerRow2Chunk",
func() {
cancel()
},
))
_, _ = session.GetRows4Test(context, nil, rs)
rs.Close()
}
}
108 changes: 108 additions & 0 deletions pkg/executor/join/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "join",
srcs = [
"base_join_probe.go",
"concurrent_map.go",
"hash_join_base.go",
"hash_join_v1.go",
"hash_join_v2.go",
"hash_table_v1.go",
"hash_table_v2.go",
"index_lookup_hash_join.go",
"index_lookup_join.go",
"index_lookup_merge_join.go",
"inner_join_probe.go",
"join_row_table.go",
"joiner.go",
"merge_join.go",
"outer_join_probe.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor/join",
visibility = ["//visibility:public"],
deps = [
"//pkg/executor/aggregate",
"//pkg/executor/internal/applycache",
"//pkg/executor/internal/exec",
"//pkg/executor/internal/vecgroupchecker",
"//pkg/executor/unionexec",
"//pkg/expression",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/planner/core",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/types",
"//pkg/util",
"//pkg/util/bitmap",
"//pkg/util/channel",
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/collate",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/disk",
"//pkg/util/execdetails",
"//pkg/util/hack",
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/mvmap",
"//pkg/util/ranger",
"//pkg/util/serialization",
"//pkg/util/sqlkiller",
"//pkg/util/syncutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "join_test",
timeout = "short",
srcs = [
"bench_test.go",
"concurrent_map_test.go",
"hash_table_v1_test.go",
"hash_table_v2_test.go",
"index_lookup_join_test.go",
"index_lookup_merge_join_test.go",
"inner_join_probe_test.go",
"join_row_table_test.go",
"join_stats_test.go",
"joiner_test.go",
"left_outer_join_probe_test.go",
"merge_join_test.go",
"right_outer_join_probe_test.go",
"row_table_builder_test.go",
],
embed = [":join"],
flaky = True,
shard_count = 48,
deps = [
"//pkg/config",
"//pkg/domain",
"//pkg/executor/internal/testutil",
"//pkg/expression",
"//pkg/parser/ast",
"//pkg/parser/mysql",
"//pkg/planner/core",
"//pkg/session",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/testkit",
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/disk",
"//pkg/util/hack",
"//pkg/util/memory",
"//pkg/util/mock",
"//pkg/util/sqlkiller",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
],
)

0 comments on commit 57da927

Please sign in to comment.