diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index 7594fdb610b1b..74fd5c6cbc14f 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -46,6 +46,7 @@ go_library( "//telemetry", "//types", "//util", + "//util/chunk", "//util/dbterror", "//util/domainutil", "//util/engine", @@ -86,6 +87,7 @@ go_test( "domain_utils_test.go", "domainctx_test.go", "main_test.go", + "plan_replayer_handle_test.go", "plan_replayer_test.go", "schema_checker_test.go", "schema_validator_test.go", @@ -109,6 +111,7 @@ go_test( "//session", "//sessionctx/variable", "//store/mockstore", + "//testkit", "//testkit/testsetup", "//util", "//util/mock", diff --git a/domain/domain.go b/domain/domain.go index cd256c581ab7b..cf94bee52a8bc 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1533,12 +1533,46 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) { // SetupPlanReplayerHandle setup plan replayer handle func (do *Domain) SetupPlanReplayerHandle(ctx sessionctx.Context) { - do.planReplayerHandle = &planReplayerHandle{ - sctx: ctx, - } + do.planReplayerHandle = &planReplayerHandle{} + do.planReplayerHandle.sctxMu.sctx = ctx do.dumpFileGcChecker.setupPlanReplayerHandle(do.planReplayerHandle) } +var planReplayerHandleLease = 10 * time.Second + +// DisablePlanReplayerBackgroundJob4Test disable plan replayer handle for test +func DisablePlanReplayerBackgroundJob4Test() { + planReplayerHandleLease = 0 +} + +// StartPlanReplayerHandle start plan replayer handle job +func (do *Domain) StartPlanReplayerHandle() { + if planReplayerHandleLease < 1 { + return + } + do.wg.Add(1) + go func() { + tikcer := time.NewTicker(planReplayerHandleLease) + defer func() { + tikcer.Stop() + do.wg.Done() + logutil.BgLogger().Info("PlanReplayerHandle exited.") + util.Recover(metrics.LabelDomain, "PlanReplayerHandle", nil, false) + }() + for { + select { + case <-do.exit: + return + case <-tikcer.C: + err := do.planReplayerHandle.CollectPlanReplayerTask(context.Background()) + if err != nil { + logutil.BgLogger().Warn("plan replayer handle collect tasks failed", zap.Error(err)) + } + } + } + }() +} + // GetPlanReplayerHandle returns plan replayer handle func (do *Domain) GetPlanReplayerHandle() *planReplayerHandle { return do.planReplayerHandle diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index 0b13500be3a7b..37f3a4568c880 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -26,10 +26,16 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" @@ -115,16 +121,23 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) { } type planReplayerHandle struct { - sync.Mutex - sctx sessionctx.Context + sctxMu struct { + sync.Mutex + sctx sessionctx.Context + } + + taskMu struct { + sync.RWMutex + tasks map[PlanReplayerTaskKey]struct{} + } } // DeletePlanReplayerStatus delete mysql.plan_replayer_status record func (h *planReplayerHandle) deletePlanReplayerStatus(ctx context.Context, token string) { ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) - h.Lock() - defer h.Unlock() - exec := h.sctx.(sqlexec.SQLExecutor) + h.sctxMu.Lock() + defer h.sctxMu.Unlock() + exec := h.sctxMu.sctx.(sqlexec.SQLExecutor) _, err := exec.ExecuteInternal(ctx1, fmt.Sprintf("delete from mysql.plan_replayer_status where token = %v", token)) if err != nil { logutil.BgLogger().Warn("delete mysql.plan_replayer_status record failed", zap.String("token", token), zap.Error(err)) @@ -154,9 +167,9 @@ func (h *planReplayerHandle) InsertPlanReplayerStatus(ctx context.Context, recor } func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) { - h.Lock() - defer h.Unlock() - exec := h.sctx.(sqlexec.SQLExecutor) + h.sctxMu.Lock() + defer h.sctxMu.Unlock() + exec := h.sctxMu.sctx.(sqlexec.SQLExecutor) _, err := exec.ExecuteInternal(ctx, fmt.Sprintf( "insert into mysql.plan_replayer_status (origin_sql, fail_reason, instance) values ('%s','%s','%s')", record.OriginSQL, record.FailedReason, instance)) @@ -167,9 +180,9 @@ func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx con } func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) { - h.Lock() - defer h.Unlock() - exec := h.sctx.(sqlexec.SQLExecutor) + h.sctxMu.Lock() + defer h.sctxMu.Unlock() + exec := h.sctxMu.sctx.(sqlexec.SQLExecutor) _, err := exec.ExecuteInternal(ctx, fmt.Sprintf( "insert into mysql.plan_replayer_status (origin_sql, token, instance) values ('%s','%s','%s')", record.OriginSQL, record.Token, instance)) @@ -179,6 +192,97 @@ func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx c } } +// CollectPlanReplayerTask collects all unhandled plan replayer task +func (h *planReplayerHandle) CollectPlanReplayerTask(ctx context.Context) error { + ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) + allKeys, err := h.collectAllPlanReplayerTask(ctx1) + if err != nil { + return err + } + tasks := make([]PlanReplayerTaskKey, 0) + for _, key := range allKeys { + unhandled, err := h.checkUnHandledReplayerTask(ctx1, key) + if err != nil { + return err + } + if unhandled { + tasks = append(tasks, key) + } + } + h.setupTasks(tasks) + return nil +} + +// GetTasks get all tasks +func (h *planReplayerHandle) GetTasks() []PlanReplayerTaskKey { + tasks := make([]PlanReplayerTaskKey, 0) + h.taskMu.RLock() + defer h.taskMu.RUnlock() + for taskKey := range h.taskMu.tasks { + tasks = append(tasks, taskKey) + } + return tasks +} + +func (h *planReplayerHandle) setupTasks(tasks []PlanReplayerTaskKey) { + r := make(map[PlanReplayerTaskKey]struct{}) + for _, task := range tasks { + r[task] = struct{}{} + } + h.taskMu.Lock() + defer h.taskMu.Unlock() + h.taskMu.tasks = r +} + +func (h *planReplayerHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) { + h.sctxMu.Lock() + defer h.sctxMu.Unlock() + exec := h.sctxMu.sctx.(sqlexec.SQLExecutor) + rs, err := exec.ExecuteInternal(ctx, "select sql_digest, plan_digest from mysql.plan_replayer_task") + if err != nil { + return nil, err + } + if rs == nil { + return nil, nil + } + var rows []chunk.Row + defer terror.Call(rs.Close) + if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil { + return nil, errors.Trace(err) + } + allKeys := make([]PlanReplayerTaskKey, 0, len(rows)) + for _, row := range rows { + sqlDigest, planDigest := row.GetString(0), row.GetString(1) + allKeys = append(allKeys, PlanReplayerTaskKey{ + sqlDigest: sqlDigest, + planDigest: planDigest, + }) + } + return allKeys, nil +} + +func (h *planReplayerHandle) checkUnHandledReplayerTask(ctx context.Context, task PlanReplayerTaskKey) (bool, error) { + h.sctxMu.Lock() + defer h.sctxMu.Unlock() + exec := h.sctxMu.sctx.(sqlexec.SQLExecutor) + rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.sqlDigest, task.planDigest)) + if err != nil { + return false, err + } + if rs == nil { + return true, nil + } + var rows []chunk.Row + defer terror.Call(rs.Close) + if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil { + return false, errors.Trace(err) + } + if len(rows) > 0 { + return false, nil + } + return true, nil +} + // PlanReplayerStatusRecord indicates record in mysql.plan_replayer_status type PlanReplayerStatusRecord struct { Internal bool @@ -186,3 +290,21 @@ type PlanReplayerStatusRecord struct { Token string FailedReason string } + +// PlanReplayerTaskKey indicates key of a plan replayer task +type PlanReplayerTaskKey struct { + sqlDigest string + planDigest string +} + +// PlanReplayerDumpTask wrap the params for plan replayer dump +type PlanReplayerDumpTask struct { + SessionBindings []*bindinfo.BindRecord + EncodedPlan string + FileName string + Zf *os.File + SessionVars *variable.SessionVars + TblStats map[int64]*handle.JSONTable + ExecStmts []ast.StmtNode + Analyze bool +} diff --git a/domain/plan_replayer_handle_test.go b/domain/plan_replayer_handle_test.go new file mode 100644 index 0000000000000..2c25f56e15045 --- /dev/null +++ b/domain/plan_replayer_handle_test.go @@ -0,0 +1,64 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package domain_test + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" +) + +func TestPlanReplayerHandleCollectTask(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + prHandle := dom.GetPlanReplayerHandle() + + // assert 1 task + tk.MustExec("delete from mysql.plan_replayer_task") + tk.MustExec("delete from mysql.plan_replayer_status") + tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');") + err := prHandle.CollectPlanReplayerTask(context.Background()) + require.NoError(t, err) + require.Len(t, prHandle.GetTasks(), 1) + + // assert no task + tk.MustExec("delete from mysql.plan_replayer_task") + tk.MustExec("delete from mysql.plan_replayer_status") + err = prHandle.CollectPlanReplayerTask(context.Background()) + require.NoError(t, err) + require.Len(t, prHandle.GetTasks(), 0) + + // assert 1 unhandled task + tk.MustExec("delete from mysql.plan_replayer_task") + tk.MustExec("delete from mysql.plan_replayer_status") + tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');") + tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('345','345');") + tk.MustExec("insert into mysql.plan_replayer_status(sql_digest, plan_digest, token, instance) values ('123','123','123','123')") + err = prHandle.CollectPlanReplayerTask(context.Background()) + require.NoError(t, err) + require.Len(t, prHandle.GetTasks(), 1) + + // assert 2 unhandled task + tk.MustExec("delete from mysql.plan_replayer_task") + tk.MustExec("delete from mysql.plan_replayer_status") + tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');") + tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('345','345');") + tk.MustExec("insert into mysql.plan_replayer_status(sql_digest, plan_digest, fail_reason, instance) values ('123','123','123','123')") + err = prHandle.CollectPlanReplayerTask(context.Background()) + require.NoError(t, err) + require.Len(t, prHandle.GetTasks(), 2) +} diff --git a/executor/infoschema_cluster_table_test.go b/executor/infoschema_cluster_table_test.go index d2e230c6fd766..f00cd5bfc0ff1 100644 --- a/executor/infoschema_cluster_table_test.go +++ b/executor/infoschema_cluster_table_test.go @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) { "test 2", )) rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows() - result := 38 + result := 39 require.Len(t, rows, result) // More tests about the privileges. diff --git a/executor/plan_replayer.go b/executor/plan_replayer.go index d1a2c7a4c56e1..c3bbbab174274 100644 --- a/executor/plan_replayer.go +++ b/executor/plan_replayer.go @@ -226,7 +226,7 @@ func generatePlanReplayerFileName() (string, error) { func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) { fileName := e.FileName zf := e.File - task := &PlanReplayerDumpTask{ + task := &domain.PlanReplayerDumpTask{ FileName: fileName, Zf: zf, SessionVars: e.ctx.GetSessionVars(), @@ -242,18 +242,6 @@ func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) { return nil } -// PlanReplayerDumpTask wrap the params for plan replayer dump -type PlanReplayerDumpTask struct { - SessionBindings []*bindinfo.BindRecord - EncodedPlan string - FileName string - Zf *os.File - SessionVars *variable.SessionVars - TblStats map[int64]*handle.JSONTable - ExecStmts []ast.StmtNode - Analyze bool -} - // DumpPlanReplayerInfo will dump the information about sqls. // The files will be organized into the following format: /* @@ -284,7 +272,7 @@ type PlanReplayerDumpTask struct { |-.... */ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context, - task *PlanReplayerDumpTask) (err error) { + task *domain.PlanReplayerDumpTask) (err error) { zf := task.Zf fileName := task.FileName sessionVars := task.SessionVars @@ -373,7 +361,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context, return dumpExplain(sctx, zw, execStmts, task.Analyze) } -func generateRecords(task *PlanReplayerDumpTask) []domain.PlanReplayerStatusRecord { +func generateRecords(task *domain.PlanReplayerDumpTask) []domain.PlanReplayerStatusRecord { records := make([]domain.PlanReplayerStatusRecord, 0) if len(task.ExecStmts) > 0 { for _, execStmt := range task.ExecStmts { diff --git a/server/http_handler_serial_test.go b/server/http_handler_serial_test.go index b0b3903eeb201..cfc542443c000 100644 --- a/server/http_handler_serial_test.go +++ b/server/http_handler_serial_test.go @@ -310,13 +310,13 @@ func TestTiFlashReplica(t *testing.T) { require.Equal(t, "a,b", strings.Join(data[0].LocationLabels, ",")) require.Equal(t, false, data[0].Available) - resp, err = ts.postStatus("/tiflash/replica-deprecated", "application/json", bytes.NewBuffer([]byte(`{"id":84,"region_count":3,"flash_region_count":3}`))) + resp, err = ts.postStatus("/tiflash/replica-deprecated", "application/json", bytes.NewBuffer([]byte(`{"id":184,"region_count":3,"flash_region_count":3}`))) require.NoError(t, err) require.NotNil(t, resp) body, err := io.ReadAll(resp.Body) require.NoError(t, err) require.NoError(t, resp.Body.Close()) - require.Equal(t, "[schema:1146]Table which ID = 84 does not exist.", string(body)) + require.Equal(t, "[schema:1146]Table which ID = 184 does not exist.", string(body)) tbl, err := ts.domain.InfoSchema().TableByName(model.NewCIStr("tidb"), model.NewCIStr("test")) require.NoError(t, err) diff --git a/session/bootstrap.go b/session/bootstrap.go index 86ad22b572045..f73bfd560d3d8 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -440,6 +440,13 @@ const ( update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, fail_reason TEXT, instance VARCHAR(512) NOT NULL comment 'address of the TiDB instance executing the plan replayer job');` + + // CreatePlanReplayerTaskTable is a table about plan replayer capture task + CreatePlanReplayerTaskTable = `CREATE TABLE IF NOT EXISTS mysql.plan_replayer_task ( + sql_digest VARCHAR(128) NOT NULL, + plan_digest VARCHAR(128) NOT NULL, + update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (sql_digest,plan_digest));` ) // bootstrap initiates system DB for a store. @@ -656,11 +663,13 @@ const ( version100 = 100 // version101 add mysql.plan_replayer_status table version101 = 101 + // version102 add mysql.plan_replayer_task table + version102 = 102 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version101 +var currentBootstrapVersion int64 = version102 // DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it. var internalSQLTimeout = owner.ManagerSessionTTL + 15 @@ -766,6 +775,7 @@ var ( upgradeToVer98, upgradeToVer100, upgradeToVer101, + upgradeToVer102, } ) @@ -2007,6 +2017,13 @@ func upgradeToVer101(s Session, ver int64) { doReentrantDDL(s, CreatePlanReplayerStatusTable) } +func upgradeToVer102(s Session, ver int64) { + if ver >= version102 { + return + } + doReentrantDDL(s, CreatePlanReplayerTaskTable) +} + func upgradeToVer99Before(s Session, ver int64) bool { if ver >= version99 { return false @@ -2144,6 +2161,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateMDLView) // Create plan_replayer_status table mustExecute(s, CreatePlanReplayerStatusTable) + // Create plan_replayer_task table + mustExecute(s, CreatePlanReplayerTaskTable) } // inTestSuite checks if we are bootstrapping in the context of tests. diff --git a/session/session.go b/session/session.go index 268c1a9f84092..d2f4afd10b679 100644 --- a/session/session.go +++ b/session/session.go @@ -2969,8 +2969,10 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { }() } - // setup dumpFileGcChecker + // setup plan replayer handle dom.SetupPlanReplayerHandle(ses[6]) + dom.StartPlanReplayerHandle() + // setup dumpFileGcChecker dom.DumpFileGcCheckerLoop() // A sub context for update table stats, and other contexts for concurrent stats loading. diff --git a/testkit/mockstore.go b/testkit/mockstore.go index 21b12eaecd611..525381dd9c148 100644 --- a/testkit/mockstore.go +++ b/testkit/mockstore.go @@ -79,6 +79,7 @@ func CreateMockStoreAndDomain(t testing.TB, opts ...mockstore.MockTiKVStoreOptio func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Domain { session.SetSchemaLease(lease) session.DisableStats4Test() + domain.DisablePlanReplayerBackgroundJob4Test() dom, err := session.BootstrapSession(store) require.NoError(t, err)