diff --git a/executor/cte.go b/executor/cte.go index 7c3d4fe128567..728310e995c1b 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -110,15 +110,24 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { // Close implements the Executor interface. func (e *CTEExec) Close() (err error) { - e.producer.resTbl.Lock() - if !e.producer.closed { - // closeProducer() only close seedExec and recursiveExec, will not touch resTbl. - // It means you can still read resTbl after call closeProducer(). - // You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next(). - // Separating these three function calls is only to follow the abstraction of the volcano model. - err = e.producer.closeProducer() - } - e.producer.resTbl.Unlock() + func() { + e.producer.resTbl.Lock() + defer e.producer.resTbl.Unlock() + if !e.producer.closed { + failpoint.Inject("mock_cte_exec_panic_avoid_deadlock", func(v failpoint.Value) { + ok := v.(bool) + if ok { + // mock an oom panic, returning ErrMemoryExceedForQuery for error identification in recovery work. + panic(exeerrors.ErrMemoryExceedForQuery) + } + }) + // closeProducer() only close seedExec and recursiveExec, will not touch resTbl. + // It means you can still read resTbl after call closeProducer(). + // You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next(). + // Separating these three function calls is only to follow the abstraction of the volcano model. + err = e.producer.closeProducer() + } + }() if err != nil { return err } diff --git a/pkg/executor/cte_test.go b/pkg/executor/cte_test.go new file mode 100644 index 0000000000000..e94fae83759b1 --- /dev/null +++ b/pkg/executor/cte_test.go @@ -0,0 +1,198 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "fmt" + "math/rand" + "slices" + "testing" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/types" + "github.com/stretchr/testify/require" +) + +func TestCTEIssue49096(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test;") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/mock_cte_exec_panic_avoid_deadlock", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/mock_cte_exec_panic_avoid_deadlock")) + }() + insertStr := "insert into t1 values(0)" + rowNum := 10 + vals := make([]int, rowNum) + vals[0] = 0 + for i := 1; i < rowNum; i++ { + v := rand.Intn(100) + vals[i] = v + insertStr += fmt.Sprintf(", (%d)", v) + } + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(c1 int);") + tk.MustExec("create table t2(c1 int);") + tk.MustExec(insertStr) + // should be insert statement, otherwise it couldn't step int resetCTEStorageMap in handleNoDelay func. + sql := "insert into t2 with cte1 as ( " + + "select c1 from t1) " + + "select c1 from cte1 natural join (select * from cte1 where c1 > 0) cte2 order by c1;" + err := tk.ExecToErr(sql) + require.NotNil(t, err) + require.Equal(t, "[executor:8175]Your query has been cancelled due to exceeding the allowed memory limit for a single SQL query. Please try narrowing your query scope or increase the tidb_mem_quota_query limit and try again.[conn=%d]", err.Error()) +} + +func TestSpillToDisk(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("SET GLOBAL tidb_enable_tmp_storage_on_oom = 1") + defer tk.MustExec("SET GLOBAL tidb_enable_tmp_storage_on_oom = 0") + tk.MustExec("use test;") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/testCTEStorageSpill", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/testCTEStorageSpill")) + tk.MustExec("set tidb_mem_quota_query = 1073741824;") + }() + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/testSortedRowContainerSpill", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/sortexec/testSortedRowContainerSpill")) + }() + + // Use duplicated rows to test UNION DISTINCT. + tk.MustExec("set tidb_mem_quota_query = 1073741824;") + insertStr := "insert into t1 values(0)" + rowNum := 1000 + vals := make([]int, rowNum) + vals[0] = 0 + for i := 1; i < rowNum; i++ { + v := rand.Intn(100) + vals[i] = v + insertStr += fmt.Sprintf(", (%d)", v) + } + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 int);") + tk.MustExec(insertStr) + tk.MustExec("set tidb_mem_quota_query = 40000;") + tk.MustExec("set cte_max_recursion_depth = 500000;") + sql := fmt.Sprintf("with recursive cte1 as ( "+ + "select c1 from t1 "+ + "union "+ + "select c1 + 1 c1 from cte1 where c1 < %d) "+ + "select c1 from cte1 order by c1;", rowNum) + rows := tk.MustQuery(sql) + + memTracker := tk.Session().GetSessionVars().StmtCtx.MemTracker + diskTracker := tk.Session().GetSessionVars().StmtCtx.DiskTracker + require.Greater(t, memTracker.MaxConsumed(), int64(0)) + require.Greater(t, diskTracker.MaxConsumed(), int64(0)) + + slices.Sort(vals) + resRows := make([]string, 0, rowNum) + for i := vals[0]; i <= rowNum; i++ { + resRows = append(resRows, fmt.Sprintf("%d", i)) + } + rows.Check(testkit.Rows(resRows...)) +} + +func TestCTEExecError(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists src;") + tk.MustExec("create table src(first int, second int);") + + insertStr := fmt.Sprintf("insert into src values (%d, %d)", rand.Intn(1000), rand.Intn(1000)) + for i := 0; i < 1000; i++ { + insertStr += fmt.Sprintf(",(%d, %d)", rand.Intn(1000), rand.Intn(1000)) + } + insertStr += ";" + tk.MustExec(insertStr) + + // Increase projection concurrency and decrease chunk size + // to increase the probability of reproducing the problem. + tk.MustExec("set tidb_max_chunk_size = 32") + tk.MustExec("set tidb_projection_concurrency = 20") + for i := 0; i < 10; i++ { + err := tk.QueryToErr("with recursive cte(iter, first, second, result) as " + + "(select 1, first, second, first+second from src " + + " union all " + + "select iter+1, second, result, second+result from cte where iter < 80 )" + + "select * from cte") + require.True(t, terror.ErrorEqual(err, types.ErrOverflow)) + } +} + +func TestCTEPanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("create table t1(c1 int)") + tk.MustExec("insert into t1 values(1), (2), (3)") + + fpPathPrefix := "github.com/pingcap/tidb/pkg/executor/" + fp := "testCTESeedPanic" + require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) + err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") + require.Contains(t, err.Error(), fp) + require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) + + fp = "testCTERecursivePanic" + require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) + err = tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") + require.Contains(t, err.Error(), fp) + require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) +} + +func TestCTEDelSpillFile(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(c1 int, c2 int);") + tk.MustExec("create table t2(c1 int);") + tk.MustExec("set @@cte_max_recursion_depth = 1000000;") + tk.MustExec("set global tidb_mem_oom_action = 'log';") + tk.MustExec("set @@tidb_mem_quota_query = 100;") + tk.MustExec("insert into t2 values(1);") + tk.MustExec("insert into t1 (c1, c2) with recursive cte1 as (select c1 from t2 union select cte1.c1 + 1 from cte1 where cte1.c1 < 100000) select cte1.c1, cte1.c1+1 from cte1;") + require.Nil(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap) +} + +func TestCTEShareCorColumn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(c1 int, c2 varchar(100));") + tk.MustExec("insert into t1 values(1, '2020-10-10');") + tk.MustExec("create table t2(c1 int, c2 date);") + tk.MustExec("insert into t2 values(1, '2020-10-10');") + for i := 0; i < 100; i++ { + tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias1) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) + tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias2) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) + } + + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(a int);") + tk.MustExec("insert into t1 values(1), (2);") + tk.MustQuery("SELECT * FROM t1 dt WHERE EXISTS( WITH RECURSIVE qn AS (SELECT a AS b UNION ALL SELECT b+1 FROM qn WHERE b=0 or b = 1) SELECT * FROM qn dtqn1 where exists (select /*+ NO_DECORRELATE() */ b from qn where dtqn1.b+1));").Check(testkit.Rows("1", "2")) +} diff --git a/util/cteutil/BUILD.bazel b/util/cteutil/BUILD.bazel index 86a9174e45809..45c95e17e30ee 100644 --- a/util/cteutil/BUILD.bazel +++ b/util/cteutil/BUILD.bazel @@ -6,10 +6,18 @@ go_library( importpath = "github.com/pingcap/tidb/util/cteutil", visibility = ["//visibility:public"], deps = [ +<<<<<<< HEAD:util/cteutil/BUILD.bazel "//types", "//util/chunk", "//util/disk", "//util/memory", +======= + "//pkg/types", + "//pkg/util/chunk", + "//pkg/util/disk", + "//pkg/util/memory", + "//pkg/util/syncutil", +>>>>>>> 0c7659c1907 (executor: fix deadlock in dml statement with cte when oom panic action was triggered (#49192)):pkg/util/cteutil/BUILD.bazel "@com_github_pingcap_errors//:errors", ], ) diff --git a/util/cteutil/storage.go b/util/cteutil/storage.go index dea6fd632e42b..2324e56fecd8c 100644 --- a/util/cteutil/storage.go +++ b/util/cteutil/storage.go @@ -15,13 +15,19 @@ package cteutil import ( - "sync" - "github.com/pingcap/errors" +<<<<<<< HEAD:util/cteutil/storage.go "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/memory" +======= + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/disk" + "github.com/pingcap/tidb/pkg/util/memory" + "github.com/pingcap/tidb/pkg/util/syncutil" +>>>>>>> 0c7659c1907 (executor: fix deadlock in dml statement with cte when oom panic action was triggered (#49192)):pkg/util/cteutil/storage.go ) var _ Storage = &StorageRC{} @@ -99,7 +105,7 @@ type StorageRC struct { refCnt int chkSize int iter int - mu sync.Mutex + mu syncutil.Mutex done bool }