diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 097f5f2835be6..f3df3ad0438ff 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -1673,7 +1673,7 @@ func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) { } // cancelRunningJob cancel a DDL job that is in the concurrent state. -func cancelRunningJob(_ *sess.Session, job *model.Job, +func cancelRunningJob(job *model.Job, byWho model.AdminCommandOperator) (err error) { // These states can't be cancelled. if job.IsDone() || job.IsSynced() { @@ -1694,7 +1694,7 @@ func cancelRunningJob(_ *sess.Session, job *model.Job, } // pauseRunningJob check and pause the running Job -func pauseRunningJob(_ *sess.Session, job *model.Job, +func pauseRunningJob(job *model.Job, byWho model.AdminCommandOperator) (err error) { if job.IsPausing() || job.IsPaused() { return dbterror.ErrPausedDDLJob.GenWithStackByArgs(job.ID) @@ -1713,7 +1713,7 @@ func pauseRunningJob(_ *sess.Session, job *model.Job, } // resumePausedJob check and resume the Paused Job -func resumePausedJob(_ *sess.Session, job *model.Job, +func resumePausedJob(job *model.Job, byWho model.AdminCommandOperator) (err error) { if !job.IsResumable() { errMsg := fmt.Sprintf("job has not been paused, job state:%s, schema state:%s", @@ -1733,7 +1733,13 @@ func resumePausedJob(_ *sess.Session, job *model.Job, } // processJobs command on the Job according to the process +<<<<<<< HEAD func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error), +======= +func processJobs( + ctx context.Context, + process func(*model.Job, model.AdminCommandOperator) (err error), +>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145)) sessCtx sessionctx.Context, ids []int64, byWho model.AdminCommandOperator) (jobErrs []error, err error) { @@ -1779,7 +1785,7 @@ func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOpera } delete(jobMap, job.ID) - err = process(ns, job, byWho) + err = process(job, byWho) if err != nil { jobErrs[i] = err continue @@ -1844,8 +1850,17 @@ func ResumeJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err e } // pprocessAllJobs processes all the jobs in the job table, 100 jobs at a time in case of high memory usage. +<<<<<<< HEAD func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error), se sessionctx.Context, byWho model.AdminCommandOperator) (map[int64]error, error) { +======= +func processAllJobs( + ctx context.Context, + process func(*model.Job, model.AdminCommandOperator) (err error), + se sessionctx.Context, + byWho model.AdminCommandOperator, +) (map[int64]error, error) { +>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145)) var err error var jobErrs = make(map[int64]error) @@ -1870,7 +1885,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp } for _, job := range jobs { - err = process(ns, job, byWho) + err = process(job, byWho) if err != nil { jobErrs[job.ID] = err continue diff --git a/pkg/ddl/ingest/config.go b/pkg/ddl/ingest/config.go index 2b17c413d8c8a..50e80cbb0dd2c 100644 --- a/pkg/ddl/ingest/config.go +++ b/pkg/ddl/ingest/config.go @@ -48,10 +48,40 @@ func genConfig( memRoot MemRoot, unique bool, resourceGroup string, +<<<<<<< HEAD ) (*litConfig, error) { tidbCfg := tidb.GetGlobalConfig() cfg := lightning.NewConfig() cfg.TikvImporter.Backend = lightning.BackendLocal +======= + concurrency int, +) (*local.BackendConfig, error) { + cfg := &local.BackendConfig{ + LocalStoreDir: jobSortPath, + ResourceGroupName: resourceGroup, + MaxConnPerStore: concurrency, + WorkerConcurrency: concurrency * 2, + KeyspaceName: tidb.GetGlobalKeyspaceName(), + // We disable the switch TiKV mode feature for now, because the impact is not + // fully tested. + ShouldCheckWriteStall: true, + + // lighting default values + CheckpointEnabled: true, + BlockSize: lightning.DefaultBlockSize, + KVWriteBatchSize: lightning.KVWriteBatchSize, + RegionSplitBatchSize: lightning.DefaultRegionSplitBatchSize, + RegionSplitConcurrency: runtime.GOMAXPROCS(0), + MemTableSize: lightning.DefaultEngineMemCacheSize, + LocalWriterMemCacheSize: lightning.DefaultLocalWriterMemCacheSize, + ShouldCheckTiKV: true, + MaxOpenFiles: int(litRLimit), + PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable, + TaskType: kvutil.ExplicitTypeDDL, + DisableAutomaticCompactions: true, + StoreWriteBWLimit: int(variable.DDLReorgMaxWriteSpeed.Load()), + } +>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145)) // Each backend will build a single dir in lightning dir. cfg.TikvImporter.SortedKVDir = jobSortPath if ImporterRangeConcurrencyForTest != nil { diff --git a/pkg/ddl/job_submitter.go b/pkg/ddl/job_submitter.go new file mode 100644 index 0000000000000..7c41669f3c670 --- /dev/null +++ b/pkg/ddl/job_submitter.go @@ -0,0 +1,716 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "bytes" + "context" + "fmt" + "math" + "slices" + "strconv" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/pkg/ddl/logutil" + "github.com/pingcap/tidb/pkg/ddl/serverstate" + sess "github.com/pingcap/tidb/pkg/ddl/session" + "github.com/pingcap/tidb/pkg/ddl/systable" + ddlutil "github.com/pingcap/tidb/pkg/ddl/util" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/owner" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/dbterror" + "github.com/pingcap/tidb/pkg/util/generic" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/mathutil" + tikv "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +// JobSubmitter collects the DDL jobs and submits them to job tables in batch, it's +// also responsible allocating IDs for the jobs. when fast-create is enabled, it +// will merge the create-table jobs to a single batch create-table job. +// export for testing. +type JobSubmitter struct { + ctx context.Context + etcdCli *clientv3.Client + ownerManager owner.Manager + store kv.Storage + serverStateSyncer serverstate.Syncer + ddlJobDoneChMap *generic.SyncMap[int64, chan struct{}] + + // init at ddl start. + sessPool *sess.Pool + sysTblMgr systable.Manager + minJobIDRefresher *systable.MinJobIDRefresher + + limitJobCh chan *JobWrapper + // get notification if any DDL job submitted or finished. + ddlJobNotifyCh chan struct{} +} + +func (s *JobSubmitter) submitLoop() { + defer util.Recover(metrics.LabelDDL, "submitLoop", nil, true) + + jobWs := make([]*JobWrapper, 0, batchAddingJobs) + ch := s.limitJobCh + for { + select { + // the channel is never closed + case jobW := <-ch: + jobWs = jobWs[:0] + failpoint.InjectCall("afterGetJobFromLimitCh", ch) + jobLen := len(ch) + jobWs = append(jobWs, jobW) + for i := 0; i < jobLen; i++ { + jobWs = append(jobWs, <-ch) + } + s.addBatchDDLJobs(jobWs) + case <-s.ctx.Done(): + return + } + } +} + +// addBatchDDLJobs gets global job IDs and puts the DDL jobs in the DDL queue. +func (s *JobSubmitter) addBatchDDLJobs(jobWs []*JobWrapper) { + startTime := time.Now() + var ( + err error + newWs []*JobWrapper + ) + fastCreate := variable.EnableFastCreateTable.Load() + if fastCreate { + newWs, err = mergeCreateTableJobs(jobWs) + if err != nil { + logutil.DDLLogger().Warn("failed to merge create table jobs", zap.Error(err)) + } else { + jobWs = newWs + } + } + err = s.addBatchDDLJobs2Table(jobWs) + var jobs string + for _, jobW := range jobWs { + if err == nil { + err = jobW.cacheErr + } + jobW.NotifyResult(err) + jobs += jobW.Job.String() + "; " + metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, jobW.Job.Type.String(), + metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) + } + if err != nil { + logutil.DDLLogger().Warn("add DDL jobs failed", zap.String("jobs", jobs), zap.Error(err)) + return + } + // Notice worker that we push a new job and wait the job done. + s.notifyNewJobSubmitted() + logutil.DDLLogger().Info("add DDL jobs", + zap.Int("batch count", len(jobWs)), + zap.String("jobs", jobs), + zap.Bool("fast_create", fastCreate)) +} + +// mergeCreateTableJobs merges CreateTable jobs to CreateTables. +func mergeCreateTableJobs(jobWs []*JobWrapper) ([]*JobWrapper, error) { + if len(jobWs) <= 1 { + return jobWs, nil + } + resJobWs := make([]*JobWrapper, 0, len(jobWs)) + mergeableJobWs := make(map[string][]*JobWrapper, len(jobWs)) + for _, jobW := range jobWs { + // we don't merge jobs with ID pre-allocated. + if jobW.Type != model.ActionCreateTable || jobW.IDAllocated { + resJobWs = append(resJobWs, jobW) + continue + } + // ActionCreateTables doesn't support foreign key now. + args := jobW.JobArgs.(*model.CreateTableArgs) + if len(args.TableInfo.ForeignKeys) > 0 { + resJobWs = append(resJobWs, jobW) + continue + } + // CreateTables only support tables of same schema now. + mergeableJobWs[jobW.Job.SchemaName] = append(mergeableJobWs[jobW.Job.SchemaName], jobW) + } + + for schema, jobs := range mergeableJobWs { + total := len(jobs) + if total <= 1 { + resJobWs = append(resJobWs, jobs...) + continue + } + const maxBatchSize = 8 + batchCount := (total + maxBatchSize - 1) / maxBatchSize + start := 0 + for _, batchSize := range mathutil.Divide2Batches(total, batchCount) { + batch := jobs[start : start+batchSize] + newJobW, err := mergeCreateTableJobsOfSameSchema(batch) + if err != nil { + return nil, err + } + start += batchSize + logutil.DDLLogger().Info("merge create table jobs", zap.String("schema", schema), + zap.Int("total", total), zap.Int("batch_size", batchSize)) + resJobWs = append(resJobWs, newJobW) + } + } + return resJobWs, nil +} + +// buildQueryStringFromJobs takes a slice of Jobs and concatenates their +// queries into a single query string. +// Each query is separated by a semicolon and a space. +// Trailing spaces are removed from each query, and a semicolon is appended +// if it's not already present. +func buildQueryStringFromJobs(jobs []*JobWrapper) string { + var queryBuilder strings.Builder + for i, job := range jobs { + q := strings.TrimSpace(job.Query) + if !strings.HasSuffix(q, ";") { + q += ";" + } + queryBuilder.WriteString(q) + + if i < len(jobs)-1 { + queryBuilder.WriteString(" ") + } + } + return queryBuilder.String() +} + +// mergeCreateTableJobsOfSameSchema combine CreateTableJobs to BatchCreateTableJob. +func mergeCreateTableJobsOfSameSchema(jobWs []*JobWrapper) (*JobWrapper, error) { + if len(jobWs) == 0 { + return nil, errors.Trace(fmt.Errorf("expect non-empty jobs")) + } + + var ( + combinedJob *model.Job + args = &model.BatchCreateTableArgs{ + Tables: make([]*model.CreateTableArgs, 0, len(jobWs)), + } + involvingSchemaInfo = make([]model.InvolvingSchemaInfo, 0, len(jobWs)) + ) + + // if there is any duplicated table name + duplication := make(map[string]struct{}) + for _, job := range jobWs { + if combinedJob == nil { + combinedJob = job.Clone() + combinedJob.Type = model.ActionCreateTables + } + jobArgs := job.JobArgs.(*model.CreateTableArgs) + args.Tables = append(args.Tables, jobArgs) + + info := jobArgs.TableInfo + if _, ok := duplication[info.Name.L]; ok { + // return err even if create table if not exists + return nil, infoschema.ErrTableExists.FastGenByArgs("can not batch create tables with same name") + } + + duplication[info.Name.L] = struct{}{} + + involvingSchemaInfo = append(involvingSchemaInfo, + model.InvolvingSchemaInfo{ + Database: job.SchemaName, + Table: info.Name.L, + }) + } + + combinedJob.InvolvingSchemaInfo = involvingSchemaInfo + combinedJob.Query = buildQueryStringFromJobs(jobWs) + + newJobW := &JobWrapper{ + Job: combinedJob, + JobArgs: args, + ResultCh: make([]chan jobSubmitResult, 0, len(jobWs)), + } + // merge the result channels. + for _, j := range jobWs { + newJobW.ResultCh = append(newJobW.ResultCh, j.ResultCh...) + } + + return newJobW, nil +} + +// addBatchDDLJobs2Table gets global job IDs and puts the DDL jobs in the DDL job table. +func (s *JobSubmitter) addBatchDDLJobs2Table(jobWs []*JobWrapper) error { + var err error + + if len(jobWs) == 0 { + return nil + } + + ctx := kv.WithInternalSourceType(s.ctx, kv.InternalTxnDDL) + se, err := s.sessPool.Get() + if err != nil { + return errors.Trace(err) + } + defer s.sessPool.Put(se) + found, err := s.sysTblMgr.HasFlashbackClusterJob(ctx, s.minJobIDRefresher.GetCurrMinJobID()) + if err != nil { + return errors.Trace(err) + } + if found { + return errors.Errorf("Can't add ddl job, have flashback cluster job") + } + + var ( + startTS = uint64(0) + bdrRole = string(ast.BDRRoleNone) + ) + + err = kv.RunInNewTxn(ctx, s.store, true, func(_ context.Context, txn kv.Transaction) error { + t := meta.NewMutator(txn) + + bdrRole, err = t.GetBDRRole() + if err != nil { + return errors.Trace(err) + } + startTS = txn.StartTS() + + return nil + }) + if err != nil { + return errors.Trace(err) + } + + for _, jobW := range jobWs { + job := jobW.Job + intest.Assert(job.Version != 0, "Job version should not be zero") + + job.StartTS = startTS + job.BDRRole = bdrRole + + // BDR mode only affects the DDL not from CDC + if job.CDCWriteSource == 0 && bdrRole != string(ast.BDRRoleNone) { + if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil { + for _, subJob := range job.MultiSchemaInfo.SubJobs { + if DeniedByBDR(ast.BDRRole(bdrRole), subJob.Type, subJob.JobArgs) { + return dbterror.ErrBDRRestrictedDDL.FastGenByArgs(bdrRole) + } + } + } else if DeniedByBDR(ast.BDRRole(bdrRole), job.Type, jobW.JobArgs) { + return dbterror.ErrBDRRestrictedDDL.FastGenByArgs(bdrRole) + } + } + + setJobStateToQueueing(job) + + if s.serverStateSyncer.IsUpgradingState() && !hasSysDB(job) { + if err = pauseRunningJob(job, model.AdminCommandBySystem); err != nil { + logutil.DDLUpgradingLogger().Warn("pause user DDL by system failed", zap.Stringer("job", job), zap.Error(err)) + jobW.cacheErr = err + continue + } + logutil.DDLUpgradingLogger().Info("pause user DDL by system successful", zap.Stringer("job", job)) + } + } + + se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) + ddlSe := sess.NewSession(se) + if err = s.GenGIDAndInsertJobsWithRetry(ctx, ddlSe, jobWs); err != nil { + return errors.Trace(err) + } + + return nil +} + +// GenGIDAndInsertJobsWithRetry generate job related global ID and inserts DDL jobs to the DDL job +// table with retry. job id allocation and job insertion are in the same transaction, +// as we want to make sure DDL jobs are inserted in id order, then we can query from +// a min job ID when scheduling DDL jobs to mitigate https://github.com/pingcap/tidb/issues/52905. +// so this function has side effect, it will set table/db/job id of 'jobs'. +func (s *JobSubmitter) GenGIDAndInsertJobsWithRetry(ctx context.Context, ddlSe *sess.Session, jobWs []*JobWrapper) error { + savedJobIDs := make([]int64, len(jobWs)) + count := getRequiredGIDCount(jobWs) + return genGIDAndCallWithRetry(ctx, ddlSe, count, func(ids []int64) error { + failpoint.Inject("mockGenGlobalIDFail", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.New("gofail genGlobalIDs error")) + } + }) + assignGIDsForJobs(jobWs, ids) + // job scheduler will start run them after txn commit, we want to make sure + // the channel exists before the jobs are submitted. + for i, jobW := range jobWs { + if savedJobIDs[i] > 0 { + // in case of retry + s.ddlJobDoneChMap.Delete(savedJobIDs[i]) + } + s.ddlJobDoneChMap.Store(jobW.ID, make(chan struct{}, 1)) + savedJobIDs[i] = jobW.ID + } + failpoint.Inject("mockGenGIDRetryableError", func() { + failpoint.Return(kv.ErrTxnRetryable) + }) + return insertDDLJobs2Table(ctx, ddlSe, jobWs...) + }) +} + +type gidAllocator struct { + idx int + ids []int64 +} + +func (a *gidAllocator) next() int64 { + id := a.ids[a.idx] + a.idx++ + return id +} + +func (a *gidAllocator) assignIDsForTable(info *model.TableInfo) { + info.ID = a.next() + if partitionInfo := info.GetPartitionInfo(); partitionInfo != nil { + a.assignIDsForPartitionInfo(partitionInfo) + } +} + +func (a *gidAllocator) assignIDsForPartitionInfo(partitionInfo *model.PartitionInfo) { + for i := range partitionInfo.Definitions { + partitionInfo.Definitions[i].ID = a.next() + } +} + +func idCountForTable(info *model.TableInfo) int { + c := 1 + if partitionInfo := info.GetPartitionInfo(); partitionInfo != nil { + c += len(partitionInfo.Definitions) + } + return c +} + +// getRequiredGIDCount returns the count of required global IDs for the jobs. it's calculated +// as: the count of jobs + the count of IDs for the jobs which do NOT have pre-allocated ID. +func getRequiredGIDCount(jobWs []*JobWrapper) int { + count := len(jobWs) + for _, jobW := range jobWs { + if jobW.IDAllocated { + continue + } + switch jobW.Type { + case model.ActionCreateView, model.ActionCreateSequence, model.ActionCreateTable: + args := jobW.JobArgs.(*model.CreateTableArgs) + count += idCountForTable(args.TableInfo) + case model.ActionCreateTables: + args := jobW.JobArgs.(*model.BatchCreateTableArgs) + for _, tblArgs := range args.Tables { + count += idCountForTable(tblArgs.TableInfo) + } + case model.ActionCreateSchema, model.ActionCreateResourceGroup: + count++ + case model.ActionAlterTablePartitioning: + args := jobW.JobArgs.(*model.TablePartitionArgs) + // A new table ID would be needed for + // the global table, which cannot be the same as the current table id, + // since this table id will be removed in the final state when removing + // all the data with this table id. + count += 1 + len(args.PartInfo.Definitions) + case model.ActionTruncateTablePartition: + count += len(jobW.JobArgs.(*model.TruncateTableArgs).OldPartitionIDs) + case model.ActionAddTablePartition, model.ActionReorganizePartition, model.ActionRemovePartitioning: + args := jobW.JobArgs.(*model.TablePartitionArgs) + count += len(args.PartInfo.Definitions) + case model.ActionTruncateTable: + count += 1 + len(jobW.JobArgs.(*model.TruncateTableArgs).OldPartitionIDs) + } + } + return count +} + +// assignGIDsForJobs should be used with getRequiredGIDCount, and len(ids) must equal +// what getRequiredGIDCount returns. +func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) { + alloc := &gidAllocator{ids: ids} + for _, jobW := range jobWs { + switch jobW.Type { + case model.ActionCreateView, model.ActionCreateSequence, model.ActionCreateTable: + args := jobW.JobArgs.(*model.CreateTableArgs) + if !jobW.IDAllocated { + alloc.assignIDsForTable(args.TableInfo) + } + jobW.TableID = args.TableInfo.ID + case model.ActionCreateTables: + if !jobW.IDAllocated { + args := jobW.JobArgs.(*model.BatchCreateTableArgs) + for _, tblArgs := range args.Tables { + alloc.assignIDsForTable(tblArgs.TableInfo) + } + } + case model.ActionCreateSchema: + dbInfo := jobW.JobArgs.(*model.CreateSchemaArgs).DBInfo + if !jobW.IDAllocated { + dbInfo.ID = alloc.next() + } + jobW.SchemaID = dbInfo.ID + case model.ActionCreateResourceGroup: + if !jobW.IDAllocated { + args := jobW.JobArgs.(*model.ResourceGroupArgs) + args.RGInfo.ID = alloc.next() + } + case model.ActionAlterTablePartitioning: + if !jobW.IDAllocated { + args := jobW.JobArgs.(*model.TablePartitionArgs) + alloc.assignIDsForPartitionInfo(args.PartInfo) + args.PartInfo.NewTableID = alloc.next() + } + case model.ActionAddTablePartition, model.ActionReorganizePartition: + if !jobW.IDAllocated { + pInfo := jobW.JobArgs.(*model.TablePartitionArgs).PartInfo + alloc.assignIDsForPartitionInfo(pInfo) + } + case model.ActionRemovePartitioning: + // a special partition is used in this case, and we will use the ID + // of the partition as the new table ID. + pInfo := jobW.JobArgs.(*model.TablePartitionArgs).PartInfo + if !jobW.IDAllocated { + alloc.assignIDsForPartitionInfo(pInfo) + } + pInfo.NewTableID = pInfo.Definitions[0].ID + case model.ActionTruncateTable, model.ActionTruncateTablePartition: + if !jobW.IDAllocated { + args := jobW.JobArgs.(*model.TruncateTableArgs) + if jobW.Type == model.ActionTruncateTable { + args.NewTableID = alloc.next() + } + partIDs := make([]int64, len(args.OldPartitionIDs)) + for i := range partIDs { + partIDs[i] = alloc.next() + } + args.NewPartitionIDs = partIDs + } + } + jobW.ID = alloc.next() + } +} + +// genGIDAndCallWithRetry generates global IDs and calls the function with retry. +// generate ID and call function runs in the same transaction. +func genGIDAndCallWithRetry(ctx context.Context, ddlSe *sess.Session, count int, fn func(ids []int64) error) error { + var resErr error + for i := uint(0); i < kv.MaxRetryCnt; i++ { + resErr = func() (err error) { + if err := ddlSe.Begin(ctx); err != nil { + return errors.Trace(err) + } + defer func() { + if err != nil { + ddlSe.Rollback() + } + }() + txn, err := ddlSe.Txn() + if err != nil { + return errors.Trace(err) + } + txn.SetOption(kv.Pessimistic, true) + forUpdateTS, err := lockGlobalIDKey(ctx, ddlSe, txn) + if err != nil { + return errors.Trace(err) + } + txn.GetSnapshot().SetOption(kv.SnapshotTS, forUpdateTS) + + m := meta.NewMutator(txn) + ids, err := m.GenGlobalIDs(count) + if err != nil { + return errors.Trace(err) + } + if err = fn(ids); err != nil { + return errors.Trace(err) + } + return ddlSe.Commit(ctx) + }() + + if resErr != nil && kv.IsTxnRetryableError(resErr) { + logutil.DDLLogger().Warn("insert job meet retryable error", zap.Error(resErr)) + kv.BackOff(i) + failpoint.InjectCall("onGenGIDRetry") + continue + } + break + } + return resErr +} + +// lockGlobalIDKey locks the global ID key in the meta store. it keeps trying if +// meet write conflict, we cannot have a fixed retry count for this error, see this +// https://github.com/pingcap/tidb/issues/27197#issuecomment-2216315057. +// this part is same as how we implement pessimistic + repeatable read isolation +// level in SQL executor, see doLockKeys. +// NextGlobalID is a meta key, so we cannot use "select xx for update", if we store +// it into a table row or using advisory lock, we will depends on a system table +// that is created by us, cyclic. although we can create a system table without using +// DDL logic, we will only consider change it when we have data dictionary and keep +// it this way now. +// TODO maybe we can unify the lock mechanism with SQL executor in the future, or +// implement it inside TiKV client-go. +func lockGlobalIDKey(ctx context.Context, ddlSe *sess.Session, txn kv.Transaction) (uint64, error) { + var ( + iteration uint + forUpdateTs = txn.StartTS() + ver kv.Version + err error + ) + waitTime := ddlSe.GetSessionVars().LockWaitTimeout + m := meta.NewMutator(txn) + idKey := m.GlobalIDKey() + for { + lockCtx := tikv.NewLockCtx(forUpdateTs, waitTime, time.Now()) + err = txn.LockKeys(ctx, lockCtx, idKey) + if err == nil || !terror.ErrorEqual(kv.ErrWriteConflict, err) { + break + } + // ErrWriteConflict contains a conflict-commit-ts in most case, but it cannot + // be used as forUpdateTs, see comments inside handleAfterPessimisticLockError + ver, err = ddlSe.GetStore().CurrentVersion(oracle.GlobalTxnScope) + if err != nil { + break + } + forUpdateTs = ver.Ver + + kv.BackOff(iteration) + // avoid it keep growing and overflow. + iteration = min(iteration+1, math.MaxInt) + } + return forUpdateTs, err +} + +func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWrapper) error { + failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr")) + } + }) + if len(jobWs) == 0 { + return nil + } + var sql bytes.Buffer + sql.WriteString("insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values") + for i, jobW := range jobWs { + jobW.FillArgsWithSubJobs() + b, err := jobW.Encode(true) + if err != nil { + return err + } + if i != 0 { + sql.WriteString(",") + } + fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", jobW.ID, jobW.MayNeedReorg(), + strconv.Quote(job2SchemaIDs(jobW)), strconv.Quote(job2TableIDs(jobW)), + ddlutil.WrapKey2String(b), jobW.Type, jobW.Started()) + } + se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) + _, err := se.Execute(ctx, sql.String(), "insert_job") + logutil.DDLLogger().Debug("add job to mysql.tidb_ddl_job table", zap.String("sql", sql.String())) + return errors.Trace(err) +} + +func makeStringForIDs(ids []int64) string { + set := make(map[int64]struct{}, len(ids)) + for _, id := range ids { + set[id] = struct{}{} + } + + s := make([]string, 0, len(set)) + for id := range set { + s = append(s, strconv.FormatInt(id, 10)) + } + slices.Sort(s) + return strings.Join(s, ",") +} + +func job2SchemaIDs(jobW *JobWrapper) string { + switch jobW.Type { + case model.ActionRenameTables: + var ids []int64 + arg := jobW.JobArgs.(*model.RenameTablesArgs) + ids = make([]int64, 0, len(arg.RenameTableInfos)*2) + for _, info := range arg.RenameTableInfos { + ids = append(ids, info.OldSchemaID, info.NewSchemaID) + } + return makeStringForIDs(ids) + case model.ActionRenameTable: + oldSchemaID := jobW.JobArgs.(*model.RenameTableArgs).OldSchemaID + ids := []int64{oldSchemaID, jobW.SchemaID} + return makeStringForIDs(ids) + case model.ActionExchangeTablePartition: + args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs) + return makeStringForIDs([]int64{jobW.SchemaID, args.PTSchemaID}) + default: + return strconv.FormatInt(jobW.SchemaID, 10) + } +} + +func job2TableIDs(jobW *JobWrapper) string { + switch jobW.Type { + case model.ActionRenameTables: + var ids []int64 + arg := jobW.JobArgs.(*model.RenameTablesArgs) + ids = make([]int64, 0, len(arg.RenameTableInfos)) + for _, info := range arg.RenameTableInfos { + ids = append(ids, info.TableID) + } + return makeStringForIDs(ids) + case model.ActionExchangeTablePartition: + args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs) + return makeStringForIDs([]int64{jobW.TableID, args.PTTableID}) + case model.ActionTruncateTable: + newTableID := jobW.JobArgs.(*model.TruncateTableArgs).NewTableID + return strconv.FormatInt(jobW.TableID, 10) + "," + strconv.FormatInt(newTableID, 10) + default: + return strconv.FormatInt(jobW.TableID, 10) + } +} + +func setJobStateToQueueing(job *model.Job) { + if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil { + for _, sub := range job.MultiSchemaInfo.SubJobs { + sub.State = model.JobStateQueueing + } + } + job.State = model.JobStateQueueing +} + +func (s *JobSubmitter) notifyNewJobSubmitted() { + if s.ownerManager.IsOwner() { + asyncNotify(s.ddlJobNotifyCh) + return + } + s.notifyNewJobByEtcd() +} + +func (s *JobSubmitter) notifyNewJobByEtcd() { + if s.etcdCli == nil { + return + } + + err := ddlutil.PutKVToEtcd(s.ctx, s.etcdCli, 1, addingDDLJobNotifyKey, "0") + if err != nil { + logutil.DDLLogger().Info("notify new DDL job failed", zap.Error(err)) + } +} diff --git a/pkg/ddl/util/util.go b/pkg/ddl/util/util.go index 22a29a0e539d1..b0e9e4ece841e 100644 --- a/pkg/ddl/util/util.go +++ b/pkg/ddl/util/util.go @@ -48,7 +48,7 @@ const ( completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %?` updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?` deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` - loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + ")" + loadGlobalVarsSQL = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + ")" // KeyOpDefaultTimeout is the default timeout for each key operation. KeyOpDefaultTimeout = 2 * time.Second // KeyOpRetryInterval is the interval between two key operations. @@ -185,17 +185,18 @@ func UpdateDeleteRange(sctx sessionctx.Context, dr DelRangeTask, newStartKey, ol func LoadDDLReorgVars(ctx context.Context, sctx sessionctx.Context) error { // close issue #21391 // variable.TiDBRowFormatVersion is used to encode the new row for column type change. - return LoadGlobalVars(ctx, sctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize, variable.TiDBRowFormatVersion}) + return loadGlobalVars(ctx, sctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize, variable.TiDBRowFormatVersion}) } // LoadDDLVars loads ddl variable from mysql.global_variables. func LoadDDLVars(ctx sessionctx.Context) error { - return LoadGlobalVars(context.Background(), ctx, []string{variable.TiDBDDLErrorCountLimit}) + return loadGlobalVars(context.Background(), ctx, []string{variable.TiDBDDLErrorCountLimit}) } -// LoadGlobalVars loads global variable from mysql.global_variables. -func LoadGlobalVars(ctx context.Context, sctx sessionctx.Context, varNames []string) error { +// loadGlobalVars loads global variable from mysql.global_variables. +func loadGlobalVars(ctx context.Context, sctx sessionctx.Context, varNames []string) error { ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL) +<<<<<<< HEAD // *mock.Context does not support SQL execution. Only do it when sctx is not `mock.Context` if _, ok := sctx.(*mock.Context); !ok { e := sctx.GetRestrictedSQLExecutor() @@ -208,6 +209,15 @@ func LoadGlobalVars(ctx context.Context, sctx sessionctx.Context, varNames []str } buf.WriteString("%?") paramNames = append(paramNames, name) +======= + e := sctx.GetRestrictedSQLExecutor() + var buf strings.Builder + buf.WriteString(loadGlobalVarsSQL) + paramNames := make([]any, 0, len(varNames)) + for i, name := range varNames { + if i > 0 { + buf.WriteString(", ") +>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145)) } buf.WriteString(")") rows, _, err := e.ExecRestrictedSQL(ctx, nil, buf.String(), paramNames...) diff --git a/pkg/executor/test/ddl/BUILD.bazel b/pkg/executor/test/ddl/BUILD.bazel index 3750a5c2bf3a9..9afea7a879edc 100644 --- a/pkg/executor/test/ddl/BUILD.bazel +++ b/pkg/executor/test/ddl/BUILD.bazel @@ -8,7 +8,11 @@ go_test( "main_test.go", ], flaky = True, +<<<<<<< HEAD shard_count = 17, +======= + shard_count = 21, +>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145)) deps = [ "//pkg/config", "//pkg/ddl/schematracker", @@ -32,6 +36,7 @@ go_test( "//pkg/util/chunk", "//pkg/util/dbterror", "//pkg/util/dbterror/plannererrors", + "@com_github_docker_go_units//:go-units", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", diff --git a/pkg/executor/test/ddl/ddl_test.go b/pkg/executor/test/ddl/ddl_test.go index d1232a1de397f..f7aa2d13e37b1 100644 --- a/pkg/executor/test/ddl/ddl_test.go +++ b/pkg/executor/test/ddl/ddl_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/docker/go-units" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl/schematracker" ddltestutil "github.com/pingcap/tidb/pkg/ddl/testutil" @@ -879,6 +880,31 @@ func TestSetDDLErrorCountLimit(t *testing.T) { res.Check(testkit.Rows("100")) } +func TestSetDDLReorgMaxWriteSpeed(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + require.Equal(t, int64(variable.DefTiDBDDLReorgMaxWriteSpeed), variable.DDLReorgMaxWriteSpeed.Load()) + + // valid values + for _, val := range []int64{1, 0, 100, 1024 * 1024, 2147483647, units.PiB} { + tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_max_write_speed = %d", val)) + require.Equal(t, val, variable.DDLReorgMaxWriteSpeed.Load()) + tk.MustQuery("select @@global.tidb_ddl_reorg_max_write_speed").Check(testkit.Rows(strconv.FormatInt(val, 10))) + } + for _, val := range []string{"1", "0", "100", "2KB", "3MiB", "4 gb", "2147483647", "1125899906842624" /* 1PiB */} { + tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_max_write_speed = '%s'", val)) + expected, err := units.RAMInBytes(val) + require.NoError(t, err) + require.Equal(t, expected, variable.DDLReorgMaxWriteSpeed.Load()) + tk.MustQuery("select @@global.tidb_ddl_reorg_max_write_speed").Check(testkit.Rows(strconv.FormatInt(expected, 10))) + } + + // invalid values + tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = -1") + tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = invalid_val") + tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = %d", units.PiB+1) +} + func TestLoadDDLDistributeVars(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 07d95984ebca6..636588c11b1b0 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -26,6 +26,7 @@ import ( "sync/atomic" "time" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/keyspace" @@ -765,6 +766,23 @@ var defaultSysVars = []*SysVar{ SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) return nil }}, + {Scope: ScopeGlobal, Name: TiDBDDLReorgMaxWriteSpeed, Value: strconv.Itoa(DefTiDBDDLReorgMaxWriteSpeed), Type: TypeStr, + SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + i64, err := units.RAMInBytes(val) + if err != nil { + return errors.Trace(err) + } + if i64 < 0 || i64 > units.PiB { + // Here we limit the max value to 1 PiB instead of math.MaxInt64, since: + // 1. it is large enough + // 2. units.RAMInBytes would first cast the size to a float, and may lose precision when the size is too large + return fmt.Errorf("invalid value for '%d', it should be within [%d, %d]", i64, 0, units.PiB) + } + DDLReorgMaxWriteSpeed.Store(i64) + return nil + }, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) { + return strconv.FormatInt(DDLReorgMaxWriteSpeed.Load(), 10), nil + }}, {Scope: ScopeGlobal, Name: TiDBDDLErrorCountLimit, Value: strconv.Itoa(DefTiDBDDLErrorCountLimit), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxInt64, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLErrorCountLimit(TidbOptInt64(val, DefTiDBDDLErrorCountLimit)) return nil diff --git a/pkg/sessionctx/variable/tidb_vars.go b/pkg/sessionctx/variable/tidb_vars.go index 37c76aa6a4375..d50581128ab30 100644 --- a/pkg/sessionctx/variable/tidb_vars.go +++ b/pkg/sessionctx/variable/tidb_vars.go @@ -509,6 +509,9 @@ const ( // It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH TiDBDDLReorgPriority = "tidb_ddl_reorg_priority" + // TiDBDDLReorgMaxWriteSpeed defines the max write limitation for the lightning local backend + TiDBDDLReorgMaxWriteSpeed = "tidb_ddl_reorg_max_write_speed" + // TiDBEnableAutoIncrementInGenerated disables the mysql compatibility check on using auto-incremented columns in // expression indexes and generated columns described here https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html for details. TiDBEnableAutoIncrementInGenerated = "tidb_enable_auto_increment_in_generated" @@ -1172,6 +1175,7 @@ const ( // Default TiDB system variable values. const ( +<<<<<<< HEAD DefHostname = "localhost" DefIndexLookupConcurrency = ConcurrencyUnset DefIndexLookupJoinConcurrency = ConcurrencyUnset @@ -1398,6 +1402,241 @@ const ( DefTiDBEnableNonPreparedPlanCacheForDML = false DefTiDBNonPreparedPlanCacheSize = 100 DefTiDBPlanCacheMaxPlanSize = 2 * size.MB +======= + DefHostname = "localhost" + DefIndexLookupConcurrency = ConcurrencyUnset + DefIndexLookupJoinConcurrency = ConcurrencyUnset + DefIndexSerialScanConcurrency = 1 + DefIndexJoinBatchSize = 25000 + DefIndexLookupSize = 20000 + DefDistSQLScanConcurrency = 15 + DefAnalyzeDistSQLScanConcurrency = 4 + DefBuildStatsConcurrency = 2 + DefBuildSamplingStatsConcurrency = 2 + DefAutoAnalyzeRatio = 0.5 + DefAutoAnalyzeStartTime = "00:00 +0000" + DefAutoAnalyzeEndTime = "23:59 +0000" + DefAutoIncrementIncrement = 1 + DefAutoIncrementOffset = 1 + DefChecksumTableConcurrency = 4 + DefSkipUTF8Check = false + DefSkipASCIICheck = false + DefOptAggPushDown = false + DefOptDeriveTopN = false + DefOptCartesianBCJ = 1 + DefOptMPPOuterJoinFixedBuildSide = false + DefOptWriteRowID = false + DefOptEnableCorrelationAdjustment = true + DefOptLimitPushDownThreshold = 100 + DefOptCorrelationThreshold = 0.9 + DefOptCorrelationExpFactor = 1 + DefOptCPUFactor = 3.0 + DefOptCopCPUFactor = 3.0 + DefOptTiFlashConcurrencyFactor = 24.0 + DefOptNetworkFactor = 1.0 + DefOptScanFactor = 1.5 + DefOptDescScanFactor = 3.0 + DefOptSeekFactor = 20.0 + DefOptMemoryFactor = 0.001 + DefOptDiskFactor = 1.5 + DefOptConcurrencyFactor = 3.0 + DefOptForceInlineCTE = false + DefOptInSubqToJoinAndAgg = true + DefOptPreferRangeScan = true + DefBatchInsert = false + DefBatchDelete = false + DefBatchCommit = false + DefCurretTS = 0 + DefInitChunkSize = 32 + DefMinPagingSize = int(paging.MinPagingSize) + DefMaxPagingSize = int(paging.MaxPagingSize) + DefMaxChunkSize = 1024 + DefDMLBatchSize = 0 + DefMaxPreparedStmtCount = -1 + DefWaitTimeout = 28800 + DefTiDBMemQuotaApplyCache = 32 << 20 // 32MB. + DefTiDBMemQuotaBindingCache = 64 << 20 // 64MB. + DefTiDBGeneralLog = false + DefTiDBPProfSQLCPU = 0 + DefTiDBRetryLimit = 10 + DefTiDBDisableTxnAutoRetry = true + DefTiDBConstraintCheckInPlace = false + DefTiDBHashJoinConcurrency = ConcurrencyUnset + DefTiDBProjectionConcurrency = ConcurrencyUnset + DefBroadcastJoinThresholdSize = 100 * 1024 * 1024 + DefBroadcastJoinThresholdCount = 10 * 1024 + DefPreferBCJByExchangeDataSize = false + DefTiDBOptimizerSelectivityLevel = 0 + DefTiDBOptimizerEnableNewOFGB = false + DefTiDBEnableOuterJoinReorder = true + DefTiDBEnableNAAJ = true + DefTiDBAllowBatchCop = 1 + DefShardRowIDBits = 0 + DefPreSplitRegions = 0 + DefBlockEncryptionMode = "aes-128-ecb" + DefTiDBAllowMPPExecution = true + DefTiDBAllowTiFlashCop = false + DefTiDBHashExchangeWithNewCollation = true + DefTiDBEnforceMPPExecution = false + DefTiFlashMaxThreads = -1 + DefTiFlashMaxBytesBeforeExternalJoin = -1 + DefTiFlashMaxBytesBeforeExternalGroupBy = -1 + DefTiFlashMaxBytesBeforeExternalSort = -1 + DefTiFlashMemQuotaQueryPerNode = 0 + DefTiFlashQuerySpillRatio = 0.7 + DefTiDBEnableTiFlashPipelineMode = true + DefTiDBMPPStoreFailTTL = "60s" + DefTiDBTxnMode = PessimisticTxnMode + DefTiDBRowFormatV1 = 1 + DefTiDBRowFormatV2 = 2 + DefTiDBDDLReorgWorkerCount = 4 + DefTiDBDDLReorgBatchSize = 256 + DefTiDBDDLFlashbackConcurrency = 64 + DefTiDBDDLErrorCountLimit = 512 + DefTiDBDDLReorgMaxWriteSpeed = 0 + DefTiDBMaxDeltaSchemaCount = 1024 + DefTiDBPlacementMode = PlacementModeStrict + DefTiDBEnableAutoIncrementInGenerated = false + DefTiDBHashAggPartialConcurrency = ConcurrencyUnset + DefTiDBHashAggFinalConcurrency = ConcurrencyUnset + DefTiDBWindowConcurrency = ConcurrencyUnset + DefTiDBMergeJoinConcurrency = 1 // disable optimization by default + DefTiDBStreamAggConcurrency = 1 + DefTiDBForcePriority = mysql.NoPriority + DefEnableWindowFunction = true + DefEnablePipelinedWindowFunction = true + DefEnableStrictDoubleTypeCheck = true + DefEnableVectorizedExpression = true + DefTiDBOptJoinReorderThreshold = 0 + DefTiDBDDLSlowOprThreshold = 300 + DefTiDBUseFastAnalyze = false + DefTiDBSkipIsolationLevelCheck = false + DefTiDBExpensiveQueryTimeThreshold = 60 // 60s + DefTiDBExpensiveTxnTimeThreshold = 60 * 10 // 10 minutes + DefTiDBScatterRegion = ScatterOff + DefTiDBWaitSplitRegionFinish = true + DefWaitSplitRegionTimeout = 300 // 300s + DefTiDBEnableNoopFuncs = Off + DefTiDBEnableNoopVariables = true + DefTiDBAllowRemoveAutoInc = false + DefTiDBUsePlanBaselines = true + DefTiDBEvolvePlanBaselines = false + DefTiDBEvolvePlanTaskMaxTime = 600 // 600s + DefTiDBEvolvePlanTaskStartTime = "00:00 +0000" + DefTiDBEvolvePlanTaskEndTime = "23:59 +0000" + DefInnodbLockWaitTimeout = 50 // 50s + DefTiDBStoreLimit = 0 + DefTiDBMetricSchemaStep = 60 // 60s + DefTiDBMetricSchemaRangeDuration = 60 // 60s + DefTiDBFoundInPlanCache = false + DefTiDBFoundInBinding = false + DefTiDBEnableCollectExecutionInfo = true + DefTiDBAllowAutoRandExplicitInsert = false + DefTiDBEnableClusteredIndex = ClusteredIndexDefModeOn + DefTiDBRedactLog = Off + DefTiDBRestrictedReadOnly = false + DefTiDBSuperReadOnly = false + DefTiDBShardAllocateStep = math.MaxInt64 + DefTiDBEnableTelemetry = false + DefTiDBEnableParallelApply = false + DefTiDBPartitionPruneMode = "dynamic" + DefTiDBEnableRateLimitAction = false + DefTiDBEnableAsyncCommit = false + DefTiDBEnable1PC = false + DefTiDBGuaranteeLinearizability = true + DefTiDBAnalyzeVersion = 2 + // Deprecated: This variable is deprecated, please do not use this variable. + DefTiDBAutoAnalyzePartitionBatchSize = mysql.PartitionCountLimit + DefTiDBEnableIndexMergeJoin = false + DefTiDBTrackAggregateMemoryUsage = true + DefCTEMaxRecursionDepth = 1000 + DefTiDBTmpTableMaxSize = 64 << 20 // 64MB. + DefTiDBEnableLocalTxn = false + DefTiDBTSOClientBatchMaxWaitTime = 0.0 // 0ms + DefTiDBEnableTSOFollowerProxy = false + DefPDEnableFollowerHandleRegion = false + DefTiDBEnableOrderedResultMode = false + DefTiDBEnablePseudoForOutdatedStats = false + DefTiDBRegardNULLAsPoint = true + DefEnablePlacementCheck = true + DefTimestamp = "0" + DefTimestampFloat = 0.0 + DefTiDBEnableStmtSummary = true + DefTiDBStmtSummaryInternalQuery = false + DefTiDBStmtSummaryRefreshInterval = 1800 + DefTiDBStmtSummaryHistorySize = 24 + DefTiDBStmtSummaryMaxStmtCount = 3000 + DefTiDBStmtSummaryMaxSQLLength = 4096 + DefTiDBCapturePlanBaseline = Off + DefTiDBIgnoreInlistPlanDigest = false + DefTiDBEnableIndexMerge = true + DefEnableLegacyInstanceScope = true + DefTiDBTableCacheLease = 3 // 3s + DefTiDBPersistAnalyzeOptions = true + DefTiDBStatsLoadSyncWait = 100 + DefTiDBStatsLoadPseudoTimeout = true + DefSysdateIsNow = false + DefTiDBEnableParallelHashaggSpill = true + DefTiDBEnableMutationChecker = false + DefTiDBTxnAssertionLevel = AssertionOffStr + DefTiDBIgnorePreparedCacheCloseStmt = false + DefTiDBBatchPendingTiFlashCount = 4000 + DefRCReadCheckTS = false + DefTiDBRemoveOrderbyInSubquery = true + DefTiDBSkewDistinctAgg = false + DefTiDB3StageDistinctAgg = true + DefTiDB3StageMultiDistinctAgg = false + DefTiDBOptExplainEvaledSubquery = false + DefTiDBReadStaleness = 0 + DefTiDBGCMaxWaitTime = 24 * 60 * 60 + DefMaxAllowedPacket uint64 = 67108864 + DefTiDBEnableBatchDML = false + DefTiDBMemQuotaQuery = memory.DefMemQuotaQuery // 1GB + DefTiDBStatsCacheMemQuota = 0 + MaxTiDBStatsCacheMemQuota = 1024 * 1024 * 1024 * 1024 // 1TB + DefTiDBQueryLogMaxLen = 4096 + DefRequireSecureTransport = false + DefTiDBCommitterConcurrency = 128 + DefTiDBBatchDMLIgnoreError = false + DefTiDBMemQuotaAnalyze = -1 + DefTiDBEnableAutoAnalyze = true + DefTiDBEnableAutoAnalyzePriorityQueue = true + DefTiDBAnalyzeColumnOptions = "PREDICATE" + DefTiDBMemOOMAction = "CANCEL" + DefTiDBMaxAutoAnalyzeTime = 12 * 60 * 60 + DefTiDBAutoAnalyzeConcurrency = 1 + DefTiDBEnablePrepPlanCache = true + DefTiDBPrepPlanCacheSize = 100 + DefTiDBSessionPlanCacheSize = 100 + DefTiDBEnablePrepPlanCacheMemoryMonitor = true + DefTiDBPrepPlanCacheMemoryGuardRatio = 0.1 + DefTiDBEnableDistTask = true + DefTiDBEnableFastCreateTable = true + DefTiDBSimplifiedMetrics = false + DefTiDBEnablePaging = true + DefTiFlashFineGrainedShuffleStreamCount = 0 + DefStreamCountWhenMaxThreadsNotSet = 8 + DefTiFlashFineGrainedShuffleBatchSize = 8192 + DefAdaptiveClosestReadThreshold = 4096 + DefTiDBEnableAnalyzeSnapshot = false + DefTiDBGenerateBinaryPlan = true + DefEnableTiDBGCAwareMemoryTrack = false + DefTiDBDefaultStrMatchSelectivity = 0.8 + DefTiDBEnableTmpStorageOnOOM = true + DefTiDBEnableMDL = true + DefTiFlashFastScan = false + DefMemoryUsageAlarmRatio = 0.7 + DefMemoryUsageAlarmKeepRecordNum = 5 + DefTiDBEnableFastReorg = true + DefTiDBDDLDiskQuota = 100 * 1024 * 1024 * 1024 // 100GB + DefExecutorConcurrency = 5 + DefTiDBEnableNonPreparedPlanCache = false + DefTiDBEnableNonPreparedPlanCacheForDML = false + DefTiDBNonPreparedPlanCacheSize = 100 + DefTiDBPlanCacheMaxPlanSize = 2 * size.MB + DefTiDBInstancePlanCacheMaxMemSize = 100 * size.MB + DefTiDBInstancePlanCacheReservedPercentage = 0.1 +>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145)) // MaxDDLReorgBatchSize is exported for testing. MaxDDLReorgBatchSize int32 = 10240 MinDDLReorgBatchSize int32 = 32 @@ -1491,6 +1730,7 @@ const ( // Process global variables. var ( +<<<<<<< HEAD ProcessGeneralLog = atomic.NewBool(false) RunAutoAnalyze = atomic.NewBool(DefTiDBEnableAutoAnalyze) EnableAutoAnalyzePriorityQueue = atomic.NewBool(DefTiDBEnableAutoAnalyzePriorityQueue) @@ -1505,6 +1745,32 @@ var ( ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit ddlReorgRowFormat int64 = DefTiDBRowFormatV2 maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount +======= + ProcessGeneralLog = atomic.NewBool(false) + RunAutoAnalyze = atomic.NewBool(DefTiDBEnableAutoAnalyze) + EnableAutoAnalyzePriorityQueue = atomic.NewBool(DefTiDBEnableAutoAnalyzePriorityQueue) + // AnalyzeColumnOptions is a global variable that indicates the default column choice for ANALYZE. + // The value of this variable is a string that can be one of the following values: + // "PREDICATE", "ALL". + // The behavior of the analyze operation depends on the value of `tidb_persist_analyze_options`: + // 1. If `tidb_persist_analyze_options` is enabled and the column choice from the analyze options record is set to `default`, + // the value of `tidb_analyze_column_options` determines the behavior of the analyze operation. + // 2. If `tidb_persist_analyze_options` is disabled, `tidb_analyze_column_options` is used directly to decide + // whether to analyze all columns or just the predicate columns. + AnalyzeColumnOptions = atomic.NewString(DefTiDBAnalyzeColumnOptions) + GlobalLogMaxDays = atomic.NewInt32(int32(config.GetGlobalConfig().Log.File.MaxDays)) + QueryLogMaxLen = atomic.NewInt32(DefTiDBQueryLogMaxLen) + EnablePProfSQLCPU = atomic.NewBool(false) + EnableBatchDML = atomic.NewBool(false) + EnableTmpStorageOnOOM = atomic.NewBool(DefTiDBEnableTmpStorageOnOOM) + ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount + ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize + ddlFlashbackConcurrency int32 = DefTiDBDDLFlashbackConcurrency + ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit + ddlReorgRowFormat int64 = DefTiDBRowFormatV2 + DDLReorgMaxWriteSpeed = atomic.NewInt64(DefTiDBDDLReorgMaxWriteSpeed) + maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount +>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145)) // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. DDLSlowOprThreshold = config.GetGlobalConfig().Instance.DDLSlowOprThreshold ForcePriority = int32(DefTiDBForcePriority)