Skip to content

Commit

Permalink
ddl: introduce a new system variable to control the `store-write-bwli…
Browse files Browse the repository at this point in the history
…mit` when ingesting (#57145) (#57376)

close #57156
  • Loading branch information
ti-chi-bot authored Dec 11, 2024
1 parent 2082f4b commit c0e0d45
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 15 deletions.
14 changes: 7 additions & 7 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@ func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) {
}

// cancelRunningJob cancel a DDL job that is in the concurrent state.
func cancelRunningJob(_ *sess.Session, job *model.Job,
func cancelRunningJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
// These states can't be cancelled.
if job.IsDone() || job.IsSynced() {
Expand All @@ -1479,7 +1479,7 @@ func cancelRunningJob(_ *sess.Session, job *model.Job,
}

// pauseRunningJob check and pause the running Job
func pauseRunningJob(_ *sess.Session, job *model.Job,
func pauseRunningJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
if job.IsPausing() || job.IsPaused() {
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(job.ID)
Expand All @@ -1498,7 +1498,7 @@ func pauseRunningJob(_ *sess.Session, job *model.Job,
}

// resumePausedJob check and resume the Paused Job
func resumePausedJob(_ *sess.Session, job *model.Job,
func resumePausedJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
if !job.IsResumable() {
errMsg := fmt.Sprintf("job has not been paused, job state:%s, schema state:%s",
Expand All @@ -1518,7 +1518,7 @@ func resumePausedJob(_ *sess.Session, job *model.Job,
}

// processJobs command on the Job according to the process
func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
func processJobs(process func(*model.Job, model.AdminCommandOperator) (err error),
sessCtx sessionctx.Context,
ids []int64,
byWho model.AdminCommandOperator) (jobErrs []error, err error) {
Expand Down Expand Up @@ -1564,7 +1564,7 @@ func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOpera
}
delete(jobMap, job.ID)

err = process(ns, job, byWho)
err = process(job, byWho)
if err != nil {
jobErrs[i] = err
continue
Expand Down Expand Up @@ -1629,7 +1629,7 @@ func ResumeJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err e
}

// pprocessAllJobs processes all the jobs in the job table, 100 jobs at a time in case of high memory usage.
func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
func processAllJobs(process func(*model.Job, model.AdminCommandOperator) (err error),
se sessionctx.Context, byWho model.AdminCommandOperator) (map[int64]error, error) {
var err error
var jobErrs = make(map[int64]error)
Expand All @@ -1655,7 +1655,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp
}

for _, job := range jobs {
err = process(ns, job, byWho)
err = process(job, byWho)
if err != nil {
jobErrs[job.ID] = err
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
setJobStateToQueueing(job)

if d.stateSyncer.IsUpgradingState() && !hasSysDB(job) {
if err = pauseRunningJob(sess.NewSession(se), job, model.AdminCommandBySystem); err != nil {
if err = pauseRunningJob(job, model.AdminCommandBySystem); err != nil {
logutil.BgLogger().Warn("pause user DDL by system failed", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job), zap.Error(err))
task.cacheErr = err
continue
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/common"
lightning "github.com/pingcap/tidb/br/pkg/lightning/config"
tidb "github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
Expand All @@ -46,6 +47,7 @@ func genConfig(ctx context.Context, memRoot MemRoot, jobID int64, unique bool) (
tidbCfg := tidb.GetGlobalConfig()
cfg := lightning.NewConfig()
cfg.TikvImporter.Backend = lightning.BackendLocal
cfg.TikvImporter.StoreWriteBWLimit = lightning.ByteSize(variable.DDLReorgMaxWriteSpeed.Load())
// Each backend will build a single dir in lightning dir.
cfg.TikvImporter.SortedKVDir = filepath.Join(LitSortPath, EncodeBackendTag(jobID))
if ImporterRangeConcurrencyForTest != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %?`
updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?`
deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?`
loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + ")"
loadGlobalVarsSQL = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + ")"
// KeyOpDefaultTimeout is the default timeout for each key operation.
KeyOpDefaultTimeout = 2 * time.Second
// KeyOpRetryInterval is the interval between two key operations.
Expand Down Expand Up @@ -184,20 +184,20 @@ func UpdateDeleteRange(sctx sessionctx.Context, dr DelRangeTask, newStartKey, ol
func LoadDDLReorgVars(ctx context.Context, sctx sessionctx.Context) error {
// close issue #21391
// variable.TiDBRowFormatVersion is used to encode the new row for column type change.
return LoadGlobalVars(ctx, sctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize, variable.TiDBRowFormatVersion})
return loadGlobalVars(ctx, sctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize, variable.TiDBRowFormatVersion})
}

// LoadDDLVars loads ddl variable from mysql.global_variables.
func LoadDDLVars(ctx sessionctx.Context) error {
return LoadGlobalVars(context.Background(), ctx, []string{variable.TiDBDDLErrorCountLimit})
return loadGlobalVars(context.Background(), ctx, []string{variable.TiDBDDLErrorCountLimit})
}

// LoadGlobalVars loads global variable from mysql.global_variables.
func LoadGlobalVars(ctx context.Context, sctx sessionctx.Context, varNames []string) error {
// loadGlobalVars loads global variable from mysql.global_variables.
func loadGlobalVars(ctx context.Context, sctx sessionctx.Context, varNames []string) error {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
if e, ok := sctx.(sqlexec.RestrictedSQLExecutor); ok {
var buf strings.Builder
buf.WriteString(loadGlobalVars)
buf.WriteString(loadGlobalVarsSQL)
paramNames := make([]interface{}, 0, len(varNames))
for i, name := range varNames {
if i > 0 {
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/test/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 27,
shard_count = 28,
deps = [
"//pkg/config",
"//pkg/ddl/schematracker",
Expand Down Expand Up @@ -36,6 +36,7 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/dbterror",
"//pkg/util/dbterror/exeerrors",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
26 changes: 26 additions & 0 deletions pkg/executor/test/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/docker/go-units"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl/schematracker"
ddltestutil "github.com/pingcap/tidb/pkg/ddl/testutil"
Expand Down Expand Up @@ -1183,6 +1184,31 @@ func TestSetDDLErrorCountLimit(t *testing.T) {
res.Check(testkit.Rows("100"))
}

func TestSetDDLReorgMaxWriteSpeed(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.Equal(t, int64(variable.DefTiDBDDLReorgMaxWriteSpeed), variable.DDLReorgMaxWriteSpeed.Load())

// valid values
for _, val := range []int64{1, 0, 100, 1024 * 1024, 2147483647, units.PiB} {
tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_max_write_speed = %d", val))
require.Equal(t, val, variable.DDLReorgMaxWriteSpeed.Load())
tk.MustQuery("select @@global.tidb_ddl_reorg_max_write_speed").Check(testkit.Rows(strconv.FormatInt(val, 10)))
}
for _, val := range []string{"1", "0", "100", "2KB", "3MiB", "4 gb", "2147483647", "1125899906842624" /* 1PiB */} {
tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_max_write_speed = '%s'", val))
expected, err := units.RAMInBytes(val)
require.NoError(t, err)
require.Equal(t, expected, variable.DDLReorgMaxWriteSpeed.Load())
tk.MustQuery("select @@global.tidb_ddl_reorg_max_write_speed").Check(testkit.Rows(strconv.FormatInt(expected, 10)))
}

// invalid values
tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = -1")
tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = invalid_val")
tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = %d", units.PiB+1)
}

func TestLoadDDLDistributeVars(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/variable/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ go_library(
"//pkg/util/tls",
"//pkg/util/topsql/state",
"//pkg/util/versioninfo",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//kv",
Expand Down
18 changes: 18 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync/atomic"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/keyspace"
Expand Down Expand Up @@ -739,6 +740,23 @@ var defaultSysVars = []*SysVar{
SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize)))
return nil
}},
{Scope: ScopeGlobal, Name: TiDBDDLReorgMaxWriteSpeed, Value: strconv.Itoa(DefTiDBDDLReorgMaxWriteSpeed), Type: TypeStr,
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
i64, err := units.RAMInBytes(val)
if err != nil {
return errors.Trace(err)
}
if i64 < 0 || i64 > units.PiB {
// Here we limit the max value to 1 PiB instead of math.MaxInt64, since:
// 1. it is large enough
// 2. units.RAMInBytes would first cast the size to a float, and may lose precision when the size is too large
return fmt.Errorf("invalid value for '%d', it should be within [%d, %d]", i64, 0, units.PiB)
}
DDLReorgMaxWriteSpeed.Store(i64)
return nil
}, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) {
return strconv.FormatInt(DDLReorgMaxWriteSpeed.Load(), 10), nil
}},
{Scope: ScopeGlobal, Name: TiDBDDLErrorCountLimit, Value: strconv.Itoa(DefTiDBDDLErrorCountLimit), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxInt64, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
SetDDLErrorCountLimit(TidbOptInt64(val, DefTiDBDDLErrorCountLimit))
return nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ const (
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"

// TiDBDDLReorgMaxWriteSpeed defines the max write limitation for the lightning local backend
TiDBDDLReorgMaxWriteSpeed = "tidb_ddl_reorg_max_write_speed"

// TiDBEnableAutoIncrementInGenerated disables the mysql compatibility check on using auto-incremented columns in
// expression indexes and generated columns described here https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html for details.
TiDBEnableAutoIncrementInGenerated = "tidb_enable_auto_increment_in_generated"
Expand Down Expand Up @@ -1210,6 +1213,7 @@ const (
DefTiDBDDLReorgBatchSize = 256
DefTiDBDDLFlashbackConcurrency = 64
DefTiDBDDLErrorCountLimit = 512
DefTiDBDDLReorgMaxWriteSpeed = 0
DefTiDBMaxDeltaSchemaCount = 1024
DefTiDBPlacementMode = PlacementModeStrict
DefTiDBEnableAutoIncrementInGenerated = false
Expand Down Expand Up @@ -1440,6 +1444,7 @@ var (
ddlFlashbackConcurrency int32 = DefTiDBDDLFlashbackConcurrency
ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit
ddlReorgRowFormat int64 = DefTiDBRowFormatV2
DDLReorgMaxWriteSpeed = atomic.NewInt64(DefTiDBDDLReorgMaxWriteSpeed)
maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount
// DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
DDLSlowOprThreshold = config.GetGlobalConfig().Instance.DDLSlowOprThreshold
Expand Down

0 comments on commit c0e0d45

Please sign in to comment.