diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 4bb80dc5243dd..d5f833058ae50 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -1451,7 +1451,7 @@ func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) { } // cancelRunningJob cancel a DDL job that is in the concurrent state. -func cancelRunningJob(_ *sess.Session, job *model.Job, +func cancelRunningJob(job *model.Job, byWho model.AdminCommandOperator) (err error) { // These states can't be cancelled. if job.IsDone() || job.IsSynced() { @@ -1472,7 +1472,7 @@ func cancelRunningJob(_ *sess.Session, job *model.Job, } // pauseRunningJob check and pause the running Job -func pauseRunningJob(_ *sess.Session, job *model.Job, +func pauseRunningJob(job *model.Job, byWho model.AdminCommandOperator) (err error) { if job.IsPausing() || job.IsPaused() { return dbterror.ErrPausedDDLJob.GenWithStackByArgs(job.ID) @@ -1491,7 +1491,7 @@ func pauseRunningJob(_ *sess.Session, job *model.Job, } // resumePausedJob check and resume the Paused Job -func resumePausedJob(_ *sess.Session, job *model.Job, +func resumePausedJob(job *model.Job, byWho model.AdminCommandOperator) (err error) { if !job.IsResumable() { errMsg := fmt.Sprintf("job has not been paused, job state:%s, schema state:%s", @@ -1511,7 +1511,7 @@ func resumePausedJob(_ *sess.Session, job *model.Job, } // processJobs command on the Job according to the process -func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error), +func processJobs(process func(*model.Job, model.AdminCommandOperator) (err error), sessCtx sessionctx.Context, ids []int64, byWho model.AdminCommandOperator) (jobErrs []error, err error) { @@ -1557,7 +1557,7 @@ func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOpera } delete(jobMap, job.ID) - err = process(ns, job, byWho) + err = process(job, byWho) if err != nil { jobErrs[i] = err continue @@ -1622,7 +1622,7 @@ func ResumeJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err e } // pprocessAllJobs processes all the jobs in the job table, 100 jobs at a time in case of high memory usage. -func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error), +func processAllJobs(process func(*model.Job, model.AdminCommandOperator) (err error), se sessionctx.Context, byWho model.AdminCommandOperator) (map[int64]error, error) { var err error var jobErrs = make(map[int64]error) @@ -1648,7 +1648,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp } for _, job := range jobs { - err = process(ns, job, byWho) + err = process(job, byWho) if err != nil { jobErrs[job.ID] = err continue diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 074506df392b5..99308c8ec06ff 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -397,7 +397,7 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error { setJobStateToQueueing(job) if d.stateSyncer.IsUpgradingState() && !hasSysDB(job) { - if err = pauseRunningJob(sess.NewSession(se), job, model.AdminCommandBySystem); err != nil { + if err = pauseRunningJob(job, model.AdminCommandBySystem); err != nil { logutil.BgLogger().Warn("pause user DDL by system failed", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job), zap.Error(err)) task.cacheErr = err continue diff --git a/pkg/ddl/ingest/config.go b/pkg/ddl/ingest/config.go index f29f55e01452e..447958c54abc2 100644 --- a/pkg/ddl/ingest/config.go +++ b/pkg/ddl/ingest/config.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" lightning "github.com/pingcap/tidb/br/pkg/lightning/config" tidb "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/size" "go.uber.org/zap" @@ -46,6 +47,7 @@ func genConfig(ctx context.Context, memRoot MemRoot, jobID int64, unique bool) ( tidbCfg := tidb.GetGlobalConfig() cfg := lightning.NewConfig() cfg.TikvImporter.Backend = lightning.BackendLocal + cfg.TikvImporter.StoreWriteBWLimit = lightning.ByteSize(variable.DDLReorgMaxWriteSpeed.Load()) // Each backend will build a single dir in lightning dir. cfg.TikvImporter.SortedKVDir = filepath.Join(LitSortPath, EncodeBackendTag(jobID)) if ImporterRangeConcurrencyForTest != nil { diff --git a/pkg/ddl/util/util.go b/pkg/ddl/util/util.go index 8a56d64122260..80216b23f8bcf 100644 --- a/pkg/ddl/util/util.go +++ b/pkg/ddl/util/util.go @@ -46,7 +46,7 @@ const ( completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %?` updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?` deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` - loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + ")" + loadGlobalVarsSQL = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + ")" // KeyOpDefaultTimeout is the default timeout for each key operation. KeyOpDefaultTimeout = 2 * time.Second // KeyOpRetryInterval is the interval between two key operations. @@ -183,20 +183,20 @@ func UpdateDeleteRange(sctx sessionctx.Context, dr DelRangeTask, newStartKey, ol func LoadDDLReorgVars(ctx context.Context, sctx sessionctx.Context) error { // close issue #21391 // variable.TiDBRowFormatVersion is used to encode the new row for column type change. - return LoadGlobalVars(ctx, sctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize, variable.TiDBRowFormatVersion}) + return loadGlobalVars(ctx, sctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize, variable.TiDBRowFormatVersion}) } // LoadDDLVars loads ddl variable from mysql.global_variables. func LoadDDLVars(ctx sessionctx.Context) error { - return LoadGlobalVars(context.Background(), ctx, []string{variable.TiDBDDLErrorCountLimit}) + return loadGlobalVars(context.Background(), ctx, []string{variable.TiDBDDLErrorCountLimit}) } -// LoadGlobalVars loads global variable from mysql.global_variables. -func LoadGlobalVars(ctx context.Context, sctx sessionctx.Context, varNames []string) error { +// loadGlobalVars loads global variable from mysql.global_variables. +func loadGlobalVars(ctx context.Context, sctx sessionctx.Context, varNames []string) error { ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL) if e, ok := sctx.(sqlexec.RestrictedSQLExecutor); ok { var buf strings.Builder - buf.WriteString(loadGlobalVars) + buf.WriteString(loadGlobalVarsSQL) paramNames := make([]interface{}, 0, len(varNames)) for i, name := range varNames { if i > 0 { diff --git a/pkg/executor/test/ddl/BUILD.bazel b/pkg/executor/test/ddl/BUILD.bazel index 9ff2e29495c50..902c24e1c420f 100644 --- a/pkg/executor/test/ddl/BUILD.bazel +++ b/pkg/executor/test/ddl/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 27, + shard_count = 28, deps = [ "//pkg/config", "//pkg/ddl/schematracker", @@ -36,6 +36,7 @@ go_test( "//pkg/util/chunk", "//pkg/util/dbterror", "//pkg/util/dbterror/exeerrors", + "@com_github_docker_go_units//:go-units", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", diff --git a/pkg/executor/test/ddl/ddl_test.go b/pkg/executor/test/ddl/ddl_test.go index 2e1ed3c901cc1..286c963f64fbc 100644 --- a/pkg/executor/test/ddl/ddl_test.go +++ b/pkg/executor/test/ddl/ddl_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/docker/go-units" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl/schematracker" ddltestutil "github.com/pingcap/tidb/pkg/ddl/testutil" @@ -1183,6 +1184,31 @@ func TestSetDDLErrorCountLimit(t *testing.T) { res.Check(testkit.Rows("100")) } +func TestSetDDLReorgMaxWriteSpeed(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + require.Equal(t, int64(variable.DefTiDBDDLReorgMaxWriteSpeed), variable.DDLReorgMaxWriteSpeed.Load()) + + // valid values + for _, val := range []int64{1, 0, 100, 1024 * 1024, 2147483647, units.PiB} { + tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_max_write_speed = %d", val)) + require.Equal(t, val, variable.DDLReorgMaxWriteSpeed.Load()) + tk.MustQuery("select @@global.tidb_ddl_reorg_max_write_speed").Check(testkit.Rows(strconv.FormatInt(val, 10))) + } + for _, val := range []string{"1", "0", "100", "2KB", "3MiB", "4 gb", "2147483647", "1125899906842624" /* 1PiB */} { + tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_max_write_speed = '%s'", val)) + expected, err := units.RAMInBytes(val) + require.NoError(t, err) + require.Equal(t, expected, variable.DDLReorgMaxWriteSpeed.Load()) + tk.MustQuery("select @@global.tidb_ddl_reorg_max_write_speed").Check(testkit.Rows(strconv.FormatInt(expected, 10))) + } + + // invalid values + tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = -1") + tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = invalid_val") + tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = %d", units.PiB+1) +} + func TestLoadDDLDistributeVars(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/pkg/sessionctx/variable/BUILD.bazel b/pkg/sessionctx/variable/BUILD.bazel index cf4cca28db875..04be4c9752abc 100644 --- a/pkg/sessionctx/variable/BUILD.bazel +++ b/pkg/sessionctx/variable/BUILD.bazel @@ -67,6 +67,7 @@ go_library( "//pkg/util/tls", "//pkg/util/topsql/state", "//pkg/util/versioninfo", + "@com_github_docker_go_units//:go-units", "@com_github_pingcap_errors//:errors", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//kv", diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 444418fd01ec0..54812b97e4185 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -25,6 +25,7 @@ import ( "sync/atomic" "time" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/keyspace" @@ -739,6 +740,23 @@ var defaultSysVars = []*SysVar{ SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) return nil }}, + {Scope: ScopeGlobal, Name: TiDBDDLReorgMaxWriteSpeed, Value: strconv.Itoa(DefTiDBDDLReorgMaxWriteSpeed), Type: TypeStr, + SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + i64, err := units.RAMInBytes(val) + if err != nil { + return errors.Trace(err) + } + if i64 < 0 || i64 > units.PiB { + // Here we limit the max value to 1 PiB instead of math.MaxInt64, since: + // 1. it is large enough + // 2. units.RAMInBytes would first cast the size to a float, and may lose precision when the size is too large + return fmt.Errorf("invalid value for '%d', it should be within [%d, %d]", i64, 0, units.PiB) + } + DDLReorgMaxWriteSpeed.Store(i64) + return nil + }, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) { + return strconv.FormatInt(DDLReorgMaxWriteSpeed.Load(), 10), nil + }}, {Scope: ScopeGlobal, Name: TiDBDDLErrorCountLimit, Value: strconv.Itoa(DefTiDBDDLErrorCountLimit), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxInt64, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { SetDDLErrorCountLimit(TidbOptInt64(val, DefTiDBDDLErrorCountLimit)) return nil diff --git a/pkg/sessionctx/variable/tidb_vars.go b/pkg/sessionctx/variable/tidb_vars.go index e10aa9c5bbb9f..770e566050268 100644 --- a/pkg/sessionctx/variable/tidb_vars.go +++ b/pkg/sessionctx/variable/tidb_vars.go @@ -506,6 +506,9 @@ const ( // It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH TiDBDDLReorgPriority = "tidb_ddl_reorg_priority" + // TiDBDDLReorgMaxWriteSpeed defines the max write limitation for the lightning local backend + TiDBDDLReorgMaxWriteSpeed = "tidb_ddl_reorg_max_write_speed" + // TiDBEnableAutoIncrementInGenerated disables the mysql compatibility check on using auto-incremented columns in // expression indexes and generated columns described here https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html for details. TiDBEnableAutoIncrementInGenerated = "tidb_enable_auto_increment_in_generated" @@ -1210,6 +1213,7 @@ const ( DefTiDBDDLReorgBatchSize = 256 DefTiDBDDLFlashbackConcurrency = 64 DefTiDBDDLErrorCountLimit = 512 + DefTiDBDDLReorgMaxWriteSpeed = 0 DefTiDBMaxDeltaSchemaCount = 1024 DefTiDBPlacementMode = PlacementModeStrict DefTiDBEnableAutoIncrementInGenerated = false @@ -1440,6 +1444,7 @@ var ( ddlFlashbackConcurrency int32 = DefTiDBDDLFlashbackConcurrency ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit ddlReorgRowFormat int64 = DefTiDBRowFormatV2 + DDLReorgMaxWriteSpeed = atomic.NewInt64(DefTiDBDDLReorgMaxWriteSpeed) maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. DDLSlowOprThreshold = config.GetGlobalConfig().Instance.DDLSlowOprThreshold