From 02d5bf5b582ffc013311d0453401dda7f6f1159b Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 14 Dec 2021 12:14:12 +0800 Subject: [PATCH 1/3] executor: add an unit test case for unreasonable invoking Close --- executor/builder.go | 17 +++++ executor/cte.go | 6 +- executor/distsql.go | 6 +- executor/executor_test.go | 136 ++++++++++++++++++++++++++++++++++++++ executor/join.go | 4 +- executor/merge_join.go | 5 ++ 6 files changed, 169 insertions(+), 5 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 109cc49915d58..b7fdbbc313143 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -124,6 +124,23 @@ type MockPhysicalPlan interface { GetExecutor() Executor } +// MockExecutorBuilder is a wrapper for executorBuilder. +// ONLY used in test. +type MockExecutorBuilder struct { + *executorBuilder +} + +// NewMockExecutorBuilderForTest is ONLY used in test. +func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *MockExecutorBuilder { + return &MockExecutorBuilder{ + executorBuilder: newExecutorBuilder(ctx, is, ti, snapshotTS, isStaleness, replicaReadScope)} +} + +// Build builds an executor tree according to `p`. +func (b *MockExecutorBuilder) Build(p plannercore.Plan) Executor { + return b.build(p) +} + func (b *executorBuilder) build(p plannercore.Plan) Executor { switch v := p.(type) { case nil: diff --git a/executor/cte.go b/executor/cte.go index 3ce82c3920559..8345bf5e57f5d 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -204,8 +204,10 @@ func (e *CTEExec) Close() (err error) { } // `iterInTbl` and `resTbl` are shared by multiple operators, // so will be closed when the SQL finishes. - if err = e.iterOutTbl.DerefAndClose(); err != nil { - return err + if e.iterOutTbl != nil { + if err = e.iterOutTbl.DerefAndClose(); err != nil { + return err + } } } diff --git a/executor/distsql.go b/executor/distsql.go index 3edb1dd709168..1c31130dc53b5 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -197,12 +197,14 @@ type IndexReaderExecutor struct { } // Close clears all resources hold by current object. -func (e *IndexReaderExecutor) Close() error { +func (e *IndexReaderExecutor) Close() (err error) { if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { return nil } - err := e.result.Close() + if e.result != nil { + err = e.result.Close() + } e.result = nil e.ctx.StoreQueryFeedback(e.feedback) return err diff --git a/executor/executor_test.go b/executor/executor_test.go index c93aa09e6ccdf..d80055d13091c 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -24,6 +24,8 @@ import ( "net" "os" "path/filepath" + "reflect" + "runtime" "strconv" "strings" "sync" @@ -80,6 +82,7 @@ import ( "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/testkit" + "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/testutil" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-tipb" @@ -9501,3 +9504,136 @@ func (s *testSerialSuite) TestIssue30289(c *C) { err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a") c.Assert(err.Error(), Matches, "issue30289 build return error") } + +// Test invoke Close without invoking Open before for each operators. +func (s *testSerialSuite) TestUnreasonablyClose(c *C) { + defer testleak.AfterTest(c)() + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + defer func() { + dom.Close() + store.Close() + }() + + is := infoschema.MockInfoSchema([]*model.TableInfo{plannercore.MockSignedTable(), plannercore.MockUnsignedTable()}) + se, err := session.CreateSession4Test(store) + c.Assert(err, IsNil) + _, err = se.Execute(context.Background(), "use test") + c.Assert(err, IsNil) + // To enable the shuffleExec operator. + _, err = se.Execute(context.Background(), "set @@tidb_merge_join_concurrency=4") + c.Assert(err, IsNil) + + var opsNeedsCovered = []plannercore.PhysicalPlan{ + &plannercore.PhysicalHashJoin{}, + &plannercore.PhysicalMergeJoin{}, + &plannercore.PhysicalIndexJoin{}, + &plannercore.PhysicalIndexHashJoin{}, + &plannercore.PhysicalTableReader{}, + &plannercore.PhysicalIndexReader{}, + &plannercore.PhysicalIndexLookUpReader{}, + &plannercore.PhysicalIndexMergeReader{}, + &plannercore.PhysicalApply{}, + &plannercore.PhysicalHashAgg{}, + &plannercore.PhysicalStreamAgg{}, + &plannercore.PhysicalLimit{}, + &plannercore.PhysicalSort{}, + &plannercore.PhysicalTopN{}, + &plannercore.PhysicalCTE{}, + &plannercore.PhysicalCTETable{}, + &plannercore.PhysicalMaxOneRow{}, + &plannercore.PhysicalProjection{}, + &plannercore.PhysicalSelection{}, + &plannercore.PhysicalTableDual{}, + &plannercore.PhysicalWindow{}, + &plannercore.PhysicalShuffle{}, + &plannercore.PhysicalUnionAll{}, + } + executorBuilder := executor.NewMockExecutorBuilderForTest(se, is, nil, math.MaxUint64, false, "global") + + var opsNeedsCoveredMask uint64 = 1< t1.a) AS a from t as t1) t", + "select /*+ hash_agg() */ count(f) from t group by a", + "select /*+ stream_agg() */ count(f) from t group by a", + "select * from t order by a, f", + "select * from t order by a, f limit 1", + "select * from t limit 1", + "select (select t1.a from t t1 where t1.a > t2.a) as a from t t2;", + "select a + 1 from t", + "select count(*) a from t having a > 1", + "select * from t where a = 1.1", + "with recursive cte1(c1) as (select 1 union select c1 + 1 from cte1 limit 5 offset 0) select * from cte1", + "select /*+use_index_merge(t, c_d_e, f)*/ * from t where c < 1 or f > 2", + "select sum(f) over (partition by f) from t", + "select /*+ merge_join(t1)*/ * from t t1 join t t2 on t1.d = t2.d", + "select a from t union all select a from t", + } { + comment := Commentf("case:%v sql:%s", i, tc) + c.Assert(err, IsNil, comment) + stmt, err := s.ParseOneStmt(tc, "", "") + c.Assert(err, IsNil, comment) + + err = se.NewTxn(context.Background()) + c.Assert(err, IsNil, comment) + p, _, err := planner.Optimize(context.TODO(), se, stmt, is) + c.Assert(err, IsNil, comment) + // This for loop level traverses the plan tree to get which operators are covered. + for child := []plannercore.PhysicalPlan{p.(plannercore.PhysicalPlan)}; len(child) != 0; { + newChild := make([]plannercore.PhysicalPlan, 0, len(child)) + for _, ch := range child { + found := false + for k, t := range opsNeedsCovered { + if reflect.TypeOf(t) == reflect.TypeOf(ch) { + opsAlreadyCoveredMask |= 1 << k + found = true + break + } + } + c.Assert(found, IsTrue, Commentf("case: %v sql: %s operator %v is not registered in opsNeedsCoveredMask", i, tc, reflect.TypeOf(ch))) + switch x := ch.(type) { + case *plannercore.PhysicalCTE: + newChild = append(newChild, x.RecurPlan) + newChild = append(newChild, x.SeedPlan) + continue + case *plannercore.PhysicalShuffle: + newChild = append(newChild, x.DataSources...) + newChild = append(newChild, x.Tails...) + continue + } + newChild = append(newChild, ch.Children()...) + } + child = newChild + } + + e := executorBuilder.Build(p) + + func() { + defer func() { + r := recover() + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + c.Assert(r, IsNil, Commentf("case: %v\n sql: %s\n error stack: %v", i, tc, string(buf))) + }() + c.Assert(e.Close(), IsNil, comment) + }() + } + // The following code is used to make sure all the operators registered + // in opsNeedsCoveredMask are covered. + commentBuf := strings.Builder{} + if opsAlreadyCoveredMask != opsNeedsCoveredMask { + for i := range opsNeedsCovered { + if opsAlreadyCoveredMask&(1< Date: Tue, 14 Dec 2021 14:49:37 +0800 Subject: [PATCH 2/3] refine --- executor/executor_test.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index d80055d13091c..ef4f434a9fb67 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -9508,15 +9508,9 @@ func (s *testSerialSuite) TestIssue30289(c *C) { // Test invoke Close without invoking Open before for each operators. func (s *testSerialSuite) TestUnreasonablyClose(c *C) { defer testleak.AfterTest(c)() - store, dom, err := newStoreWithBootstrap() - c.Assert(err, IsNil) - defer func() { - dom.Close() - store.Close() - }() is := infoschema.MockInfoSchema([]*model.TableInfo{plannercore.MockSignedTable(), plannercore.MockUnsignedTable()}) - se, err := session.CreateSession4Test(store) + se, err := session.CreateSession4Test(s.store) c.Assert(err, IsNil) _, err = se.Execute(context.Background(), "use test") c.Assert(err, IsNil) From c834c11e2daac7c95c7351e5e5e645bc6e6bbf95 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 14 Dec 2021 15:23:49 +0800 Subject: [PATCH 3/3] address comment --- executor/join.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/executor/join.go b/executor/join.go index 164d108d74f21..09990e6c5d4b1 100644 --- a/executor/join.go +++ b/executor/join.go @@ -158,6 +158,9 @@ func (e *HashJoinExec) Close() error { // Open implements the Executor Open interface. func (e *HashJoinExec) Open(ctx context.Context) error { + if err := e.baseExecutor.Open(ctx); err != nil { + return err + } e.prepared = false e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) @@ -181,7 +184,7 @@ func (e *HashJoinExec) Open(ctx context.Context) error { } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } - return e.baseExecutor.Open(ctx) + return nil } // fetchProbeSideChunks get chunks from fetches chunks from the big table in a background goroutine