Skip to content

Commit

Permalink
cherry pick pingcap#30471 to release-5.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
XuHuaiyu authored and ti-srebot committed Dec 16, 2021
1 parent d0e3c62 commit fb8e4d4
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 8 deletions.
210 changes: 210 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8279,3 +8279,213 @@ func (s *testSuite) TestIssue26532(c *C) {
tk.MustQuery("select greatest(\"2020-01-01 01:01:01\" ,\"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2020-01-01 01:01:01", "<nil>"))
tk.MustQuery("select least(\"2020-01-01 01:01:01\" , \"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2019-01-01 01:01:01", "<nil>"))
}
<<<<<<< HEAD
=======

func (s *testSuite) TestIssue25447(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(a int, b varchar(8))")
tk.MustExec("insert into t1 values(1,'1')")
tk.MustExec("create table t2(a int , b varchar(8) GENERATED ALWAYS AS (c) VIRTUAL, c varchar(8), PRIMARY KEY (a))")
tk.MustExec("insert into t2(a) values(1)")
tk.MustQuery("select /*+ tidb_inlj(t2) */ t2.b, t1.b from t1 join t2 ON t2.a=t1.a").Check(testkit.Rows("<nil> 1"))
}

func (s *testSuite) TestIssue23602(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("USE test")
tk.Se.GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn
tk.MustExec("CREATE TABLE t (a bigint unsigned PRIMARY KEY)")
defer tk.MustExec("DROP TABLE t")
tk.MustExec("INSERT INTO t VALUES (0),(1),(2),(3),(18446744073709551600),(18446744073709551605),(18446744073709551610),(18446744073709551615)")
tk.MustExec("ANALYZE TABLE t")
tk.MustQuery(`EXPLAIN FORMAT = 'brief' SELECT a FROM t WHERE a >= 0x1 AND a <= 0x2`).Check(testkit.Rows(
"TableReader 2.00 root data:TableRangeScan]\n" +
"[└─TableRangeScan 2.00 cop[tikv] table:t range:[1,2], keep order:false"))
tk.MustQuery(`EXPLAIN FORMAT = 'brief' SELECT a FROM t WHERE a BETWEEN 0x1 AND 0x2`).Check(testkit.Rows(
"TableReader 2.00 root data:TableRangeScan]\n" +
"[└─TableRangeScan 2.00 cop[tikv] table:t range:[1,2], keep order:false"))
tk.MustQuery("SELECT a FROM t WHERE a BETWEEN 0xFFFFFFFFFFFFFFF5 AND X'FFFFFFFFFFFFFFFA'").Check(testkit.Rows("18446744073709551605", "18446744073709551610"))
}

func (s *testSuite) TestCTEWithIndexLookupJoinDeadLock(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t (a int(11) default null,b int(11) default null,key b (b),key ba (b))")
tk.MustExec("create table t1 (a int(11) default null,b int(11) default null,key idx_ab (a,b),key idx_a (a),key idx_b (b))")
tk.MustExec("create table t2 (a int(11) default null,b int(11) default null,key idx_ab (a,b),key idx_a (a),key idx_b (b))")
// It's easy to reproduce this problem in 30 times execution of IndexLookUpJoin.
for i := 0; i < 30; i++ {
tk.MustExec("with cte as (with cte1 as (select * from t2 use index(idx_ab) where a > 1 and b > 1) select * from cte1) select /*+use_index(t1 idx_ab)*/ * from cte join t1 on t1.a=cte.a;")
}
}

func (s *testSuite) TestGetResultRowsCount(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int)")
for i := 1; i <= 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%v)", i))
}
cases := []struct {
sql string
row int64
}{
{"select * from t", 10},
{"select * from t where a < 0", 0},
{"select * from t where a <= 3", 3},
{"insert into t values (11)", 0},
{"replace into t values (12)", 0},
{"update t set a=13 where a=12", 0},
}

for _, ca := range cases {
if strings.HasPrefix(ca.sql, "select") {
tk.MustQuery(ca.sql)
} else {
tk.MustExec(ca.sql)
}
info := tk.Se.ShowProcess()
c.Assert(info, NotNil)
p, ok := info.Plan.(plannercore.Plan)
c.Assert(ok, IsTrue)
cnt := executor.GetResultRowsCount(tk.Se, p)
c.Assert(ca.row, Equals, cnt, Commentf("sql: %v", ca.sql))
}
}

func checkFileName(s string) bool {
files := []string{
"config.toml",
"meta.txt",
"stats/test.t_dump_single.json",
"schema/test.t_dump_single.schema.txt",
"variables.toml",
"sqls.sql",
"session_bindings.sql",
"global_bindings.sql",
"explain.txt",
}
for _, f := range files {
if strings.Compare(f, s) == 0 {
return true
}
}
return false
}

func (s *testSuiteWithData) TestPlanReplayerDumpSingle(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t_dump_single")
tk.MustExec("create table t_dump_single(a int)")
res := tk.MustQuery("plan replayer dump explain select * from t_dump_single")
path := s.testData.ConvertRowsToStrings(res.Rows())

reader, err := zip.OpenReader(filepath.Join(domain.GetPlanReplayerDirName(), path[0]))
c.Assert(err, IsNil)
defer reader.Close()
for _, file := range reader.File {
c.Assert(checkFileName(file.Name), IsTrue)
}
}

func (s *testSuiteP1) TestIssue28935(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_enable_vectorized_expression=true")
tk.MustQuery(`select trim(leading from " a "), trim(both from " a "), trim(trailing from " a ")`).Check(testkit.Rows("a a a"))
tk.MustQuery(`select trim(leading null from " a "), trim(both null from " a "), trim(trailing null from " a ")`).Check(testkit.Rows("<nil> <nil> <nil>"))
tk.MustQuery(`select trim(null from " a ")`).Check(testkit.Rows("<nil>"))

tk.MustExec("set @@tidb_enable_vectorized_expression=false")
tk.MustQuery(`select trim(leading from " a "), trim(both from " a "), trim(trailing from " a ")`).Check(testkit.Rows("a a a"))
tk.MustQuery(`select trim(leading null from " a "), trim(both null from " a "), trim(trailing null from " a ")`).Check(testkit.Rows("<nil> <nil> <nil>"))
tk.MustQuery(`select trim(null from " a ")`).Check(testkit.Rows("<nil>"))
}

func (s *testSuiteP1) TestIssue29412(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t29142_1")
tk.MustExec("drop table if exists t29142_2")
tk.MustExec("create table t29142_1(a int);")
tk.MustExec("create table t29142_2(a double);")
tk.MustExec("insert into t29142_1 value(20);")
tk.MustQuery("select sum(distinct a) as x from t29142_1 having x > some ( select a from t29142_2 where x in (a));").Check(nil)
}

func (s *testSerialSuite) TestIssue28650(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1(a int, index(a));")
tk.MustExec("create table t2(a int, c int, b char(50), index(a,c,b));")
tk.MustExec("set tidb_enable_rate_limit_action=off;")

wg := &sync.WaitGroup{}
sql := `explain analyze
select /*+ stream_agg(@sel_1) stream_agg(@sel_3) %s(@sel_2 t2)*/ count(1) from
(
SELECT t2.a AS t2_external_user_ext_id, t2.b AS t2_t1_ext_id FROM t2 INNER JOIN (SELECT t1.a AS d_t1_ext_id FROM t1 GROUP BY t1.a) AS anon_1 ON anon_1.d_t1_ext_id = t2.a WHERE t2.c = 123 AND t2.b
IN ("%s") ) tmp`

wg.Add(1)
sqls := make([]string, 2)
go func() {
defer wg.Done()
inElems := make([]string, 1000)
for i := 0; i < len(inElems); i++ {
inElems[i] = fmt.Sprintf("wm_%dbDgAAwCD-v1QB%dxky-g_dxxQCw", rand.Intn(100), rand.Intn(100))
}
sqls[0] = fmt.Sprintf(sql, "inl_join", strings.Join(inElems, "\",\""))
sqls[1] = fmt.Sprintf(sql, "inl_hash_join", strings.Join(inElems, "\",\""))
}()

tk.MustExec("insert into t1 select rand()*400;")
for i := 0; i < 10; i++ {
tk.MustExec("insert into t1 select rand()*400 from t1;")
}
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMAction = config.OOMActionCancel
})
defer func() {
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMAction = config.OOMActionLog
})
}()
wg.Wait()
for _, sql := range sqls {
tk.MustExec("set @@tidb_mem_quota_query = 1073741824") // 1GB
c.Assert(tk.QueryToErr(sql), IsNil)
tk.MustExec("set @@tidb_mem_quota_query = 33554432") // 32MB, out of memory during executing
c.Assert(strings.Contains(tk.QueryToErr(sql).Error(), "Out Of Memory Quota!"), IsTrue)
tk.MustExec("set @@tidb_mem_quota_query = 65536") // 64KB, out of memory during building the plan
func() {
defer func() {
r := recover()
c.Assert(r, NotNil)
err := errors.Errorf("%v", r)
c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue)
}()
tk.MustExec(sql)
}()
}
}

func (s *testSerialSuite) TestIssue30289(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
fpName := "github.com/pingcap/tidb/executor/issue30289"
c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable(fpName), IsNil)
}()
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a")
c.Assert(err.Error(), Matches, "issue30289 build return error")
}
>>>>>>> 80ea2389d... executor: HashJoinExec checks the buildError even if the probeSide is empty (#30471)
13 changes: 12 additions & 1 deletion executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,13 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) {
return
}
if !hasWaitedForBuild {
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
probeSideResult.Reset()
}
})
if probeSideResult.NumRows() == 0 && !e.useOuterToBuild {
e.finished.Store(true)
return
}
emptyBuild, buildErr := e.wait4BuildSide()
if buildErr != nil {
Expand Down Expand Up @@ -260,6 +264,13 @@ func (e *HashJoinExec) wait4BuildSide() (emptyBuild bool, err error) {
func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh <-chan struct{}) {
defer close(chkCh)
var err error
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
err = errors.Errorf("issue30289 build return error")
e.buildFinished <- errors.Trace(err)
return
}
})
for {
if e.finished.Load().(bool) {
return
Expand Down
28 changes: 21 additions & 7 deletions executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,25 +141,39 @@ func (e *ShuffleExec) Close() error {
if !e.prepared {
for _, w := range e.workers {
for _, r := range w.receivers {
close(r.inputHolderCh)
close(r.inputCh)
if r.inputHolderCh != nil {
close(r.inputHolderCh)
}
if r.inputCh != nil {
close(r.inputCh)
}
}
close(w.outputHolderCh)
if w.outputHolderCh != nil {
close(w.outputHolderCh)
}
}
if e.outputCh != nil {
close(e.outputCh)
}
close(e.outputCh)
}
close(e.finishCh)
if e.finishCh != nil {
close(e.finishCh)
}
for _, w := range e.workers {
for _, r := range w.receivers {
for range r.inputCh {
if r.inputCh != nil {
for range r.inputCh {
}
}
}
// close child executor of each worker
if err := w.childExec.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
for range e.outputCh { // workers exit before `e.outputCh` is closed.
if e.outputCh != nil {
for range e.outputCh { // workers exit before `e.outputCh` is closed.
}
}
e.executed = false

Expand Down

0 comments on commit fb8e4d4

Please sign in to comment.