diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index b31299e161d39..d3ce9c17c97c7 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -49,6 +49,7 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/codec" + contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/mock" "github.com/pingcap/tidb/pkg/util/ranger" @@ -86,9 +87,11 @@ func newReorgExprCtx() exprctx.ExprContext { contextstatic.WithErrLevelMap(stmtctx.DefaultStmtErrLevels), ) + planCacheTracker := contextutil.NewPlanCacheTracker(contextutil.IgnoreWarn) + return contextstatic.NewStaticExprContext( contextstatic.WithEvalCtx(evalCtx), - contextstatic.WithUseCache(false), + contextstatic.WithPlanCacheTracker(&planCacheTracker), ) } diff --git a/pkg/distsql/context/BUILD.bazel b/pkg/distsql/context/BUILD.bazel index 6ab89d5a7422a..a4bc52e6cfa1f 100644 --- a/pkg/distsql/context/BUILD.bazel +++ b/pkg/distsql/context/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "//pkg/util/context", "//pkg/util/execdetails", "//pkg/util/memory", - "//pkg/util/nocopy", "//pkg/util/sqlkiller", "//pkg/util/tiflash", "//pkg/util/topsql/stmtstats", diff --git a/pkg/distsql/context/context.go b/pkg/distsql/context/context.go index 797ec00666126..e113bc2024f04 100644 --- a/pkg/distsql/context/context.go +++ b/pkg/distsql/context/context.go @@ -24,7 +24,6 @@ import ( contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/memory" - "github.com/pingcap/tidb/pkg/util/nocopy" "github.com/pingcap/tidb/pkg/util/sqlkiller" "github.com/pingcap/tidb/pkg/util/tiflash" "github.com/pingcap/tidb/pkg/util/topsql/stmtstats" @@ -34,11 +33,6 @@ import ( // DistSQLContext provides all information needed by using functions in `distsql` type DistSQLContext struct { - // TODO: provide a `Clone` to copy this struct. - // The life cycle of some fields in this struct cannot be extended. For example, some fields will be recycled before - // the next execution. They'll need to be handled specially. - _ nocopy.NoCopy - WarnHandler contextutil.WarnAppender InRestrictedSQL bool @@ -95,3 +89,23 @@ type DistSQLContext struct { func (dctx *DistSQLContext) AppendWarning(warn error) { dctx.WarnHandler.AppendWarning(warn) } + +// Detach detaches this context from the session context. +// +// NOTE: Though this session context can be used parallelly with this context after calling +// it, the `StatementContext` cannot. The session context should create a new `StatementContext` +// before executing another statement. +func (dctx *DistSQLContext) Detach() *DistSQLContext { + newCtx := *dctx + + // TODO: using the same `SQLKiller` is actually not that meaningful. The `SQLKiller` will be reset before the + // execution of each statements, so that if the running statement is killed, the background cursor will also + // be affected (but it's not guaranteed). However, we don't provide an interface to `KILL` the background + // cursor, so that it's still good to provide at least one way to stop it. + // In the future, we should provide a more constant behavior for killing the cursor. + newCtx.SQLKiller = dctx.SQLKiller + newCtx.KVVars = new(tikvstore.Variables) + *newCtx.KVVars = *dctx.KVVars + newCtx.KVVars.Killed = &newCtx.SQLKiller.Signal + return &newCtx +} diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 6b0c8d2cdb564..9bd50bf39149d 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "cte_table_reader.go", "ddl.go", "delete.go", + "detach.go", "distsql.go", "executor.go", "explain.go", @@ -126,6 +127,7 @@ go_library( "//pkg/expression", "//pkg/expression/aggregation", "//pkg/expression/context", + "//pkg/expression/contextsession", "//pkg/extension", "//pkg/infoschema", "//pkg/infoschema/context", @@ -309,6 +311,8 @@ go_test( "compact_table_test.go", "copr_cache_test.go", "delete_test.go", + "detach_integration_test.go", + "detach_test.go", "distsql_test.go", "executor_failpoint_test.go", "executor_pkg_test.go", @@ -389,6 +393,7 @@ go_test( "//pkg/executor/sortexec", "//pkg/expression", "//pkg/expression/aggregation", + "//pkg/expression/contextstatic", "//pkg/extension", "//pkg/infoschema", "//pkg/kv", @@ -452,6 +457,7 @@ go_test( "//pkg/util/ranger", "//pkg/util/sem", "//pkg/util/set", + "//pkg/util/sqlexec", "//pkg/util/sqlkiller", "//pkg/util/stmtsummary/v2:stmtsummary", "//pkg/util/stringutil", diff --git a/pkg/executor/adapter.go b/pkg/executor/adapter.go index 2aba9fce14c73..8ed4675614cef 100644 --- a/pkg/executor/adapter.go +++ b/pkg/executor/adapter.go @@ -235,15 +235,18 @@ func (a *recordSet) OnFetchReturned() { // TryDetach creates a new `RecordSet` which doesn't depend on the current session context. func (a *recordSet) TryDetach() (sqlexec.RecordSet, bool, error) { - // TODO: also detach the executor. Currently, the executor inside may contain the session context. Once - // the executor itself supports detach, we should also detach it here. - e, ok := a.executor.(*TableReaderExecutor) + e, ok := Detach(a.executor) if !ok { return nil, false, nil } return staticrecordset.New(a.Fields(), e, a.stmt.GetTextToLog(false)), true, nil } +// GetExecutor4Test exports the internal executor for test purpose. +func (a *recordSet) GetExecutor4Test() any { + return a.executor +} + // ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement. type ExecStmt struct { // GoCtx stores parent go context.Context for a stmt. diff --git a/pkg/executor/detach.go b/pkg/executor/detach.go new file mode 100644 index 0000000000000..b65de754f1f09 --- /dev/null +++ b/pkg/executor/detach.go @@ -0,0 +1,74 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/expression/contextsession" +) + +// Detach detaches the current executor from the session context. After detaching, the session context +// can be used to execute another statement while this executor is still running. The returning value +// shows whether this executor is able to be detached. +// +// NOTE: the implementation of `Detach` should guarantee that no matter whether it returns true or false, +// both the original executor and the returning executor should be able to be used correctly. This restriction +// is to make sure that if `Detach(a)` returns `true`, while other children of `a`'s parent returns `false`, +// the caller can still use the original one. +func Detach(originalExecutor exec.Executor) (exec.Executor, bool) { + newExecutor, ok := originalExecutor.Detach() + if !ok { + return nil, false + } + + children := originalExecutor.AllChildren() + newChildren := make([]exec.Executor, len(children)) + for i, child := range children { + detached, ok := Detach(child) + if !ok { + return nil, false + } + newChildren[i] = detached + } + newExecutor.SetAllChildren(newChildren) + + return newExecutor, true +} + +func (treCtx tableReaderExecutorContext) Detach() tableReaderExecutorContext { + newCtx := treCtx + + if ctx, ok := treCtx.ectx.(*contextsession.SessionExprContext); ok { + staticExprCtx := ctx.IntoStatic() + + newCtx.dctx = newCtx.dctx.Detach() + newCtx.rctx = newCtx.rctx.Detach(staticExprCtx) + newCtx.buildPBCtx = newCtx.buildPBCtx.Detach(staticExprCtx) + newCtx.ectx = staticExprCtx + return newCtx + } + + return treCtx +} + +// Detach detaches the current executor from the session context. +func (e *TableReaderExecutor) Detach() (exec.Executor, bool) { + newExec := new(TableReaderExecutor) + *newExec = *e + + newExec.tableReaderExecutorContext = newExec.tableReaderExecutorContext.Detach() + + return newExec, true +} diff --git a/pkg/executor/detach_integration_test.go b/pkg/executor/detach_integration_test.go new file mode 100644 index 0000000000000..df3d81bc0831d --- /dev/null +++ b/pkg/executor/detach_integration_test.go @@ -0,0 +1,174 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "context" + "strconv" + "sync" + "sync/atomic" + "testing" + + "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "github.com/stretchr/testify/require" +) + +type exportExecutor interface { + GetExecutor4Test() any +} + +func TestDetachAllContexts(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.Session().GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true) + tk.MustExec("create table t (a int)") + tk.MustExec("insert into t values (1), (2), (3)") + + rs, err := tk.Exec("select * from t") + require.NoError(t, err) + oldExecutor := rs.(exportExecutor).GetExecutor4Test().(exec.Executor) + + drs := rs.(sqlexec.DetachableRecordSet) + srs, ok, err := drs.TryDetach() + require.True(t, ok) + require.NoError(t, err) + + require.NotEqual(t, rs, srs) + newExecutor := srs.(exportExecutor).GetExecutor4Test().(exec.Executor) + + require.NotEqual(t, oldExecutor, newExecutor) + // Children should be different + for i, child := range oldExecutor.AllChildren() { + require.NotEqual(t, child, newExecutor.AllChildren()[i]) + } + + // Then execute another statement + tk.MustQuery("select * from t limit 1").Check(testkit.Rows("1")) + // The previous detached record set can still be used + // check data + chk := srs.NewChunk(nil) + err = srs.Next(context.Background(), chk) + require.NoError(t, err) + require.Equal(t, 3, chk.NumRows()) + require.Equal(t, int64(1), chk.GetRow(0).GetInt64(0)) + require.Equal(t, int64(2), chk.GetRow(1).GetInt64(0)) + require.Equal(t, int64(3), chk.GetRow(2).GetInt64(0)) +} + +func TestAfterDetachSessionCanExecute(t *testing.T) { + // This test shows that the session can be safely used to execute another statement after detaching. + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.Session().GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true) + tk.MustExec("create table t (a int)") + for i := 0; i < 10000; i++ { + tk.MustExec("insert into t values (?)", i) + } + + rs, err := tk.Exec("select * from t") + require.NoError(t, err) + drs, ok, err := rs.(sqlexec.DetachableRecordSet).TryDetach() + require.NoError(t, err) + require.True(t, ok) + + // Now, the `drs` can be used concurrently with the session. + var wg sync.WaitGroup + var stop atomic.Bool + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < 10000; i++ { + if stop.Load() { + return + } + tk.MustQuery("select * from t where a = ?", i).Check(testkit.Rows(strconv.Itoa(i))) + } + }() + + chk := drs.NewChunk(nil) + expectedSelect := 0 + for { + err = drs.Next(context.Background(), chk) + require.NoError(t, err) + + if chk.NumRows() == 0 { + break + } + for i := 0; i < chk.NumRows(); i++ { + require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(0)) + expectedSelect++ + } + } + stop.Store(true) + wg.Wait() +} + +func TestDetachWithParam(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.Session().GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true) + tk.MustExec("create table t (a int primary key)") + for i := 0; i < 10000; i++ { + tk.MustExec("insert into t values (?)", i) + } + + rs, err := tk.Exec("select * from t where a > ? and a < ?", 100, 200) + require.NoError(t, err) + drs, ok, err := rs.(sqlexec.DetachableRecordSet).TryDetach() + require.NoError(t, err) + require.True(t, ok) + + // Now, execute another statement with different size of param. It'll not affect the execution of detached executor. + var wg sync.WaitGroup + var stop atomic.Bool + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < 10000; i++ { + if stop.Load() { + return + } + tk.MustQuery("select * from t where a = ?", i).Check(testkit.Rows(strconv.Itoa(i))) + } + }() + + chk := drs.NewChunk(nil) + expectedSelect := 101 + for { + err = drs.Next(context.Background(), chk) + require.NoError(t, err) + + if chk.NumRows() == 0 { + break + } + for i := 0; i < chk.NumRows(); i++ { + require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(0)) + expectedSelect++ + } + } + stop.Store(true) + wg.Wait() +} diff --git a/pkg/executor/detach_test.go b/pkg/executor/detach_test.go new file mode 100644 index 0000000000000..7f46f5479e7f8 --- /dev/null +++ b/pkg/executor/detach_test.go @@ -0,0 +1,72 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/expression/contextstatic" + "github.com/pingcap/tidb/pkg/util/mock" + "github.com/stretchr/testify/require" +) + +type mockSimpleExecutor struct { + exec.BaseExecutorV2 +} + +func TestDetachExecutor(t *testing.T) { + // call `Detach` on a mock executor will fail + _, ok := Detach(&mockSimpleExecutor{}) + require.False(t, ok) + + // call `Detach` on a TableReaderExecutor will succeed + oldExec := &TableReaderExecutor{ + tableReaderExecutorContext: tableReaderExecutorContext{ + ectx: contextstatic.NewStaticExprContext(), + }, + } + newExec, ok := Detach(oldExec) + require.True(t, ok) + require.NotSame(t, oldExec, newExec) + + // call `Detach` on a `TableReaderExecutor` with `mockSimpleExecutor` as child will fail + sess := mock.NewContext() + oldExec = &TableReaderExecutor{ + tableReaderExecutorContext: tableReaderExecutorContext{ + ectx: contextstatic.NewStaticExprContext(), + }, + BaseExecutorV2: exec.NewBaseExecutorV2(sess.GetSessionVars(), nil, 0, &mockSimpleExecutor{}), + } + _, ok = Detach(oldExec) + require.False(t, ok) + + // call `Detach` on a `TableReaderExecutor` with another `TableReaderExecutor` as child will succeed + child := &TableReaderExecutor{ + tableReaderExecutorContext: tableReaderExecutorContext{ + ectx: contextstatic.NewStaticExprContext(), + }, + } + parent := &TableReaderExecutor{ + tableReaderExecutorContext: tableReaderExecutorContext{ + ectx: contextstatic.NewStaticExprContext(), + }, + BaseExecutorV2: exec.NewBaseExecutorV2(sess.GetSessionVars(), nil, 0, child), + } + newExec, ok = Detach(parent) + require.True(t, ok) + require.NotSame(t, parent, newExec) + require.NotSame(t, child, newExec.AllChildren()[0]) +} diff --git a/pkg/executor/internal/exec/executor.go b/pkg/executor/internal/exec/executor.go index ccd4cd4d5ff30..d9a0a30cd8978 100644 --- a/pkg/executor/internal/exec/executor.go +++ b/pkg/executor/internal/exec/executor.go @@ -57,6 +57,7 @@ type Executor interface { RegisterSQLAndPlanInExecForTopSQL() AllChildren() []Executor + SetAllChildren([]Executor) Open(context.Context) error Next(ctx context.Context, req *chunk.Chunk) error @@ -66,6 +67,12 @@ type Executor interface { RetFieldTypes() []*types.FieldType InitCap() int MaxChunkSize() int + + // Detach detaches the current executor from the session context without considering its children. + // + // It has to make sure, no matter whether it returns true or false, both the original executor and the returning executor + // should be able to be used correctly. + Detach() (Executor, bool) } var _ Executor = &BaseExecutor{} @@ -164,6 +171,11 @@ func (e *executorMeta) AllChildren() []Executor { return e.children } +// SetAllChildren sets the children for an executor. +func (e *executorMeta) SetAllChildren(children []Executor) { + e.children = children +} + // ChildrenLen returns the length of children. func (e *executorMeta) ChildrenLen() int { return len(e.children) @@ -174,7 +186,7 @@ func (e *executorMeta) EmptyChildren() bool { return len(e.children) == 0 } -// SetChildren sets the children for an executor. +// SetChildren sets a child for an executor. func (e *executorMeta) SetChildren(idx int, ex Executor) { e.children[idx] = ex } @@ -309,6 +321,11 @@ func (*BaseExecutorV2) Next(_ context.Context, _ *chunk.Chunk) error { return nil } +// Detach detaches the current executor from the session context. +func (*BaseExecutorV2) Detach() (Executor, bool) { + return nil, false +} + // BaseExecutor holds common information for executors. type BaseExecutor struct { ctx sessionctx.Context diff --git a/pkg/executor/staticrecordset/cursorrecordset.go b/pkg/executor/staticrecordset/cursorrecordset.go index 59379c8a321c2..98c87cb8343b7 100644 --- a/pkg/executor/staticrecordset/cursorrecordset.go +++ b/pkg/executor/staticrecordset/cursorrecordset.go @@ -49,6 +49,11 @@ func (c *cursorRecordSet) Close() error { return c.recordSet.Close() } +// GetExecutor4Test exports the internal executor for test purpose. +func (c *cursorRecordSet) GetExecutor4Test() any { + return c.recordSet.(interface{ GetExecutor4Test() any }).GetExecutor4Test() +} + // WrapRecordSetWithCursor wraps a record set with a cursor handle. The cursor handle will be closed // automatically when the record set is closed func WrapRecordSetWithCursor(cursor cursor.Handle, recordSet sqlexec.RecordSet) sqlexec.RecordSet { diff --git a/pkg/executor/staticrecordset/recordset.go b/pkg/executor/staticrecordset/recordset.go index 973b868f79e7a..78cef1a5b5f0c 100644 --- a/pkg/executor/staticrecordset/recordset.go +++ b/pkg/executor/staticrecordset/recordset.go @@ -77,3 +77,8 @@ func (s *staticRecordSet) Close() error { return err } + +// GetExecutor4Test exports the internal executor for test purpose. +func (s *staticRecordSet) GetExecutor4Test() any { + return s.executor +} diff --git a/pkg/expression/contextsession/BUILD.bazel b/pkg/expression/contextsession/BUILD.bazel index 38353c17faad7..e333ef78b8d3b 100644 --- a/pkg/expression/contextsession/BUILD.bazel +++ b/pkg/expression/contextsession/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/errctx", "//pkg/expression/context", "//pkg/expression/contextopt", + "//pkg/expression/contextstatic", "//pkg/infoschema/context", "//pkg/parser/auth", "//pkg/parser/model", diff --git a/pkg/expression/contextsession/sessionctx.go b/pkg/expression/contextsession/sessionctx.go index eb864cb7f8d0a..9b73f583eda4b 100644 --- a/pkg/expression/contextsession/sessionctx.go +++ b/pkg/expression/contextsession/sessionctx.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/pkg/errctx" exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/expression/contextopt" + "github.com/pingcap/tidb/pkg/expression/contextstatic" infoschema "github.com/pingcap/tidb/pkg/infoschema/context" "github.com/pingcap/tidb/pkg/parser/auth" "github.com/pingcap/tidb/pkg/parser/model" @@ -136,6 +137,26 @@ func (ctx *SessionExprContext) ConnectionID() uint64 { return ctx.sctx.GetSessionVars().ConnectionID } +// IntoStatic turns the SessionExprContext into a StaticExprContext. +func (ctx *SessionExprContext) IntoStatic() *contextstatic.StaticExprContext { + staticEvalContext := ctx.SessionEvalContext.IntoStatic() + return contextstatic.NewStaticExprContext( + contextstatic.WithEvalCtx(staticEvalContext), + contextstatic.WithCharset(ctx.GetCharsetInfo()), + contextstatic.WithDefaultCollationForUTF8MB4(ctx.GetDefaultCollationForUTF8MB4()), + contextstatic.WithBlockEncryptionMode(ctx.GetBlockEncryptionMode()), + contextstatic.WithSysDateIsNow(ctx.GetSysdateIsNow()), + contextstatic.WithNoopFuncsMode(ctx.GetNoopFuncsMode()), + contextstatic.WithRng(ctx.Rng()), + contextstatic.WithPlanCacheTracker(&ctx.sctx.GetSessionVars().StmtCtx.PlanCacheTracker), + contextstatic.WithColumnIDAllocator( + exprctx.NewSimplePlanColumnIDAllocator(ctx.sctx.GetSessionVars().PlanColumnID.Load())), + contextstatic.WithConnectionID(ctx.ConnectionID()), + contextstatic.WithWindowingUseHighPrecision(ctx.GetWindowingUseHighPrecision()), + contextstatic.WithGroupConcatMaxLen(ctx.GetGroupConcatMaxLen()), + ) +} + // SessionEvalContext implements the `expression.EvalContext` interface to provide evaluation context in session. type SessionEvalContext struct { sctx sessionctx.Context @@ -287,6 +308,72 @@ func (ctx *SessionEvalContext) GetParamValue(idx int) (types.Datum, error) { return params[idx], nil } +// IntoStatic turns the SessionEvalContext into a StaticEvalContext. +func (ctx *SessionEvalContext) IntoStatic() *contextstatic.StaticEvalContext { + typeCtx := ctx.TypeCtx() + errCtx := ctx.ErrCtx() + + // TODO: at least provide some optional eval prop provider which is suitable to be used in the static context. + props := make([]exprctx.OptionalEvalPropProvider, 0, exprctx.OptPropsCnt) + + params := variable.NewPlanCacheParamList() + for _, param := range ctx.sctx.GetSessionVars().PlanCacheParams.AllParamValues() { + params.Append(param) + } + + // TODO: use a more structural way to replace the closure. + // These closure makes sure the fields which may be changed in the execution of the next statement will not be embedded into them, to make + // sure it's safe to call them after the session continues to execute other statements. + staticCtx := contextstatic.NewStaticEvalContext( + contextstatic.WithWarnHandler(ctx.sctx.GetSessionVars().StmtCtx.WarnHandler), + contextstatic.WithSQLMode(ctx.SQLMode()), + contextstatic.WithTypeFlags(typeCtx.Flags()), + contextstatic.WithLocation(typeCtx.Location()), + contextstatic.WithErrLevelMap(errCtx.LevelMap()), + contextstatic.WithCurrentDB(ctx.CurrentDB()), + contextstatic.WithCurrentTime(func() func() (time.Time, error) { + currentTime, currentTimeErr := ctx.CurrentTime() + + return func() (time.Time, error) { + return currentTime, currentTimeErr + } + }()), + contextstatic.WithMaxAllowedPacket(ctx.GetMaxAllowedPacket()), + contextstatic.WithDefaultWeekFormatMode(ctx.GetDefaultWeekFormatMode()), + contextstatic.WithDivPrecisionIncrement(ctx.GetDivPrecisionIncrement()), + contextstatic.WithPrivCheck(func() func(db string, table string, column string, priv mysql.PrivilegeType) bool { + checker := privilege.GetPrivilegeManager(ctx.sctx) + activeRoles := make([]*auth.RoleIdentity, len(ctx.sctx.GetSessionVars().ActiveRoles)) + copy(activeRoles, ctx.sctx.GetSessionVars().ActiveRoles) + + return func(db string, table string, column string, priv mysql.PrivilegeType) bool { + if checker == nil { + return true + } + + return checker.RequestVerification(activeRoles, db, table, column, priv) + } + }()), + contextstatic.WithDynamicPrivCheck(func() func(privName string, grantable bool) bool { + checker := privilege.GetPrivilegeManager(ctx.sctx) + activeRoles := make([]*auth.RoleIdentity, len(ctx.sctx.GetSessionVars().ActiveRoles)) + copy(activeRoles, ctx.sctx.GetSessionVars().ActiveRoles) + + return func(privName string, grantable bool) bool { + if checker == nil { + return true + } + + return checker.RequestDynamicVerification(activeRoles, privName, grantable) + } + }()), + contextstatic.WithParamList(params), + contextstatic.WithOptionalProperty(props...), + ) + + return staticCtx +} + func getStmtTimestamp(ctx sessionctx.Context) (time.Time, error) { if ctx != nil { staleTSO, err := ctx.GetSessionVars().StmtCtx.GetStaleTSO() diff --git a/pkg/expression/contextstatic/BUILD.bazel b/pkg/expression/contextstatic/BUILD.bazel index c32c9b255e3ec..b8060900ffe09 100644 --- a/pkg/expression/contextstatic/BUILD.bazel +++ b/pkg/expression/contextstatic/BUILD.bazel @@ -31,7 +31,7 @@ go_test( ], embed = [":contextstatic"], flaky = True, - shard_count = 10, + shard_count = 9, deps = [ "//pkg/errctx", "//pkg/expression/context", diff --git a/pkg/expression/contextstatic/exprctx.go b/pkg/expression/contextstatic/exprctx.go index 9dc432eb2aab7..21a6bac5d1891 100644 --- a/pkg/expression/contextstatic/exprctx.go +++ b/pkg/expression/contextstatic/exprctx.go @@ -15,12 +15,11 @@ package contextstatic import ( - "sync/atomic" - exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx/variable" + contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/mathutil" ) @@ -39,8 +38,7 @@ type staticExprCtxState struct { sysDateIsNow bool noopFuncsMode int rng *mathutil.MysqlRng - canUseCache *atomic.Bool - skipCacheHandleFunc func(useCache *atomic.Bool, skipReason string) + planCacheTracker *contextutil.PlanCacheTracker columnIDAllocator exprctx.PlanColumnIDAllocator connectionID uint64 windowingUseHighPrecision bool @@ -103,17 +101,11 @@ func WithRng(rng *mathutil.MysqlRng) StaticExprCtxOption { } } -// WithUseCache sets the return value of `IsUseCache` for `StaticExprContext`. -func WithUseCache(useCache bool) StaticExprCtxOption { - return func(s *staticExprCtxState) { - s.canUseCache.Store(useCache) - } -} - -// WithSkipCacheHandleFunc sets inner skip plan cache function for StaticExprContext -func WithSkipCacheHandleFunc(fn func(useCache *atomic.Bool, skipReason string)) StaticExprCtxOption { +// WithPlanCacheTracker sets the plan cache tracker for `StaticExprContext`. +func WithPlanCacheTracker(tracker *contextutil.PlanCacheTracker) StaticExprCtxOption { + intest.AssertNotNil(tracker) return func(s *staticExprCtxState) { - s.skipCacheHandleFunc = fn + s.planCacheTracker = tracker } } @@ -170,10 +162,6 @@ func NewStaticExprContext(opts ...StaticExprCtxOption) *StaticExprContext { groupConcatMaxLen: variable.DefGroupConcatMaxLen, }, } - - ctx.canUseCache = &atomic.Bool{} - ctx.canUseCache.Store(true) - for _, opt := range opts { opt(&ctx.staticExprCtxState) } @@ -190,6 +178,12 @@ func NewStaticExprContext(opts ...StaticExprCtxOption) *StaticExprContext { ctx.columnIDAllocator = exprctx.NewSimplePlanColumnIDAllocator(0) } + if ctx.planCacheTracker == nil { + cacheTracker := contextutil.NewPlanCacheTracker(ctx.evalCtx) + ctx.planCacheTracker = &cacheTracker + ctx.planCacheTracker.EnablePlanCache() + } + return ctx } @@ -199,9 +193,6 @@ func (ctx *StaticExprContext) Apply(opts ...StaticExprCtxOption) *StaticExprCont staticExprCtxState: ctx.staticExprCtxState, } - newCtx.canUseCache = &atomic.Bool{} - newCtx.canUseCache.Store(ctx.canUseCache.Load()) - for _, opt := range opts { opt(&newCtx.staticExprCtxState) } @@ -246,16 +237,12 @@ func (ctx *StaticExprContext) Rng() *mathutil.MysqlRng { // IsUseCache implements the `ExprContext.IsUseCache`. func (ctx *StaticExprContext) IsUseCache() bool { - return ctx.canUseCache.Load() + return ctx.planCacheTracker.UseCache() } // SetSkipPlanCache implements the `ExprContext.SetSkipPlanCache`. func (ctx *StaticExprContext) SetSkipPlanCache(reason string) { - if fn := ctx.skipCacheHandleFunc; fn != nil { - fn(ctx.canUseCache, reason) - return - } - ctx.canUseCache.Store(false) + ctx.planCacheTracker.SetSkipPlanCache(reason) } // AllocPlanColumnID implements the `ExprContext.AllocPlanColumnID`. diff --git a/pkg/expression/contextstatic/exprctx_test.go b/pkg/expression/contextstatic/exprctx_test.go index fa358402f1ed3..b4c945285c3db 100644 --- a/pkg/expression/contextstatic/exprctx_test.go +++ b/pkg/expression/contextstatic/exprctx_test.go @@ -15,7 +15,6 @@ package contextstatic import ( - "sync/atomic" "testing" "time" @@ -41,16 +40,13 @@ func TestNewStaticExprCtx(t *testing.T) { func TestStaticExprCtxApplyOptions(t *testing.T) { ctx := NewStaticExprContext() - oldCanUseCache := ctx.canUseCache oldEvalCtx := ctx.evalCtx oldColumnIDAllocator := ctx.columnIDAllocator // apply with options opts, s := getExprCtxOptionsForTest() ctx2 := ctx.Apply(opts...) - require.NotSame(t, oldCanUseCache, ctx2.canUseCache) require.Equal(t, oldEvalCtx, ctx.evalCtx) - require.Same(t, oldCanUseCache, ctx.canUseCache) require.Same(t, oldColumnIDAllocator, ctx.columnIDAllocator) checkDefaultStaticExprCtx(t, ctx) checkOptionsStaticExprCtx(t, ctx2, s) @@ -59,7 +55,6 @@ func TestStaticExprCtxApplyOptions(t *testing.T) { ctx3 := ctx2.Apply() s.skipCacheArgs = nil checkOptionsStaticExprCtx(t, ctx3, s) - require.NotSame(t, ctx2.canUseCache, ctx3.canUseCache) } func checkDefaultStaticExprCtx(t *testing.T, ctx *StaticExprContext) { @@ -76,7 +71,6 @@ func checkDefaultStaticExprCtx(t *testing.T, ctx *StaticExprContext) { require.Equal(t, variable.TiDBOptOnOffWarn(variable.DefTiDBEnableNoopFuncs), ctx.GetNoopFuncsMode()) require.NotNil(t, ctx.Rng()) require.True(t, ctx.IsUseCache()) - require.Nil(t, ctx.skipCacheHandleFunc) require.NotNil(t, ctx.columnIDAllocator) _, ok := ctx.columnIDAllocator.(*context.SimplePlanColumnIDAllocator) require.True(t, ok) @@ -98,6 +92,7 @@ func getExprCtxOptionsForTest() ([]StaticExprCtxOption, *exprCtxOptionsTestState colIDAlloc: context.NewSimplePlanColumnIDAllocator(1024), rng: mathutil.NewWithSeed(12345678), } + planCacheTracker := contextutil.NewPlanCacheTracker(s.evalCtx) return []StaticExprCtxOption{ WithEvalCtx(s.evalCtx), @@ -107,10 +102,7 @@ func getExprCtxOptionsForTest() ([]StaticExprCtxOption, *exprCtxOptionsTestState WithSysDateIsNow(true), WithNoopFuncsMode(variable.WarnInt), WithRng(s.rng), - WithUseCache(false), - WithSkipCacheHandleFunc(func(useCache *atomic.Bool, skipReason string) { - s.skipCacheArgs = []any{useCache, skipReason} - }), + WithPlanCacheTracker(&planCacheTracker), WithColumnIDAllocator(s.colIDAlloc), WithConnectionID(778899), WithWindowingUseHighPrecision(false), @@ -131,91 +123,12 @@ func checkOptionsStaticExprCtx(t *testing.T, ctx *StaticExprContext, s *exprCtxO require.False(t, ctx.IsUseCache()) require.Nil(t, s.skipCacheArgs) ctx.SetSkipPlanCache("reason") - require.Equal(t, []any{ctx.canUseCache, "reason"}, s.skipCacheArgs) require.Same(t, s.colIDAlloc, ctx.columnIDAllocator) require.Equal(t, uint64(778899), ctx.ConnectionID()) require.False(t, ctx.GetWindowingUseHighPrecision()) require.Equal(t, uint64(2233445566), ctx.GetGroupConcatMaxLen()) } -func TestStaticExprCtxUseCache(t *testing.T) { - // default implement - ctx := NewStaticExprContext() - require.True(t, ctx.IsUseCache()) - require.Nil(t, ctx.skipCacheHandleFunc) - ctx.SetSkipPlanCache("reason") - require.False(t, ctx.IsUseCache()) - require.Empty(t, ctx.GetEvalCtx().TruncateWarnings(0)) - - ctx = NewStaticExprContext(WithUseCache(false)) - require.False(t, ctx.IsUseCache()) - require.Nil(t, ctx.skipCacheHandleFunc) - ctx.SetSkipPlanCache("reason") - require.False(t, ctx.IsUseCache()) - require.Empty(t, ctx.GetEvalCtx().TruncateWarnings(0)) - - ctx = NewStaticExprContext(WithUseCache(true)) - require.True(t, ctx.IsUseCache()) - require.Nil(t, ctx.skipCacheHandleFunc) - ctx.SetSkipPlanCache("reason") - require.False(t, ctx.IsUseCache()) - require.Empty(t, ctx.GetEvalCtx().TruncateWarnings(0)) - - // custom skip func - var args []any - calls := 0 - ctx = NewStaticExprContext(WithSkipCacheHandleFunc(func(useCache *atomic.Bool, skipReason string) { - args = []any{useCache, skipReason} - calls++ - if calls > 1 { - useCache.Store(false) - } - })) - ctx.SetSkipPlanCache("reason1") - // If we use `WithSkipCacheHandleFunc`, useCache will be set in function - require.Equal(t, 1, calls) - require.True(t, ctx.IsUseCache()) - require.Equal(t, []any{ctx.canUseCache, "reason1"}, args) - - args = nil - ctx.SetSkipPlanCache("reason2") - require.Equal(t, 2, calls) - require.False(t, ctx.IsUseCache()) - require.Equal(t, []any{ctx.canUseCache, "reason2"}, args) - - // apply - ctx = NewStaticExprContext() - require.True(t, ctx.IsUseCache()) - ctx2 := ctx.Apply(WithUseCache(false)) - require.False(t, ctx2.IsUseCache()) - require.True(t, ctx.IsUseCache()) - require.NotSame(t, ctx.canUseCache, ctx2.canUseCache) - require.Nil(t, ctx.skipCacheHandleFunc) - require.Nil(t, ctx2.skipCacheHandleFunc) - - var args2 []any - fn1 := func(useCache *atomic.Bool, skipReason string) { args = []any{useCache, skipReason} } - fn2 := func(useCache *atomic.Bool, skipReason string) { args2 = []any{useCache, skipReason} } - ctx = NewStaticExprContext(WithUseCache(false), WithSkipCacheHandleFunc(fn1)) - require.False(t, ctx.IsUseCache()) - ctx2 = ctx.Apply(WithUseCache(true), WithSkipCacheHandleFunc(fn2)) - require.NotSame(t, ctx.canUseCache, ctx2.canUseCache) - require.False(t, ctx.IsUseCache()) - require.True(t, ctx2.IsUseCache()) - - args = nil - args2 = nil - ctx.SetSkipPlanCache("reasonA") - require.Equal(t, []any{ctx.canUseCache, "reasonA"}, args) - require.Nil(t, args2) - - args = nil - args2 = nil - ctx2.SetSkipPlanCache("reasonB") - require.Nil(t, args) - require.Equal(t, []any{ctx2.canUseCache, "reasonB"}, args2) -} - func TestExprCtxColumnIDAllocator(t *testing.T) { // default ctx := NewStaticExprContext() diff --git a/pkg/planner/context/context.go b/pkg/planner/context/context.go index 99b1ca4ed2495..e6252f8383c01 100644 --- a/pkg/planner/context/context.go +++ b/pkg/planner/context/context.go @@ -106,3 +106,14 @@ func (b *BuildPBContext) GetExprCtx() exprctx.BuildContext { func (b *BuildPBContext) GetClient() kv.Client { return b.Client } + +// Detach detaches this context from the session context. +// +// NOTE: Though this session context can be used parallelly with this context after calling +// it, the `StatementContext` cannot. The session context should create a new `StatementContext` +// before executing another statement. +func (b *BuildPBContext) Detach(staticExprCtx exprctx.BuildContext) *BuildPBContext { + newCtx := *b + newCtx.ExprCtx = staticExprCtx + return &newCtx +} diff --git a/pkg/session/session.go b/pkg/session/session.go index ff7d7854f9a16..6674458e0e418 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2418,6 +2418,11 @@ func (rs *execStmtResult) TryDetach() (sqlexec.RecordSet, bool, error) { return crs, true, nil } +// GetExecutor4Test exports the internal executor for test purpose. +func (rs *execStmtResult) GetExecutor4Test() any { + return rs.RecordSet.(interface{ GetExecutor4Test() any }).GetExecutor4Test() +} + // rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema. func (s *session) rollbackOnError(ctx context.Context) { if !s.sessionVars.InTxn() { diff --git a/pkg/util/ranger/context/context.go b/pkg/util/ranger/context/context.go index 626e924981e7c..940d7fa4e1cc2 100644 --- a/pkg/util/ranger/context/context.go +++ b/pkg/util/ranger/context/context.go @@ -34,3 +34,14 @@ type RangerContext struct { RegardNULLAsPoint bool OptPrefixIndexSingleScan bool } + +// Detach detaches this context from the session context. +// +// NOTE: Though this session context can be used parallelly with this context after calling +// it, the `StatementContext` cannot. The session context should create a new `StatementContext` +// before executing another statement. +func (r *RangerContext) Detach(staticExprCtx exprctx.BuildContext) *RangerContext { + newCtx := *r + newCtx.ExprCtx = staticExprCtx + return &newCtx +}