Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#57145
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CbcWestwolf authored and ti-chi-bot committed Nov 14, 2024
1 parent 1a581e1 commit 503d49e
Show file tree
Hide file tree
Showing 8 changed files with 1,096 additions and 10 deletions.
25 changes: 20 additions & 5 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1673,7 +1673,7 @@ func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) {
}

// cancelRunningJob cancel a DDL job that is in the concurrent state.
func cancelRunningJob(_ *sess.Session, job *model.Job,
func cancelRunningJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
// These states can't be cancelled.
if job.IsDone() || job.IsSynced() {
Expand All @@ -1694,7 +1694,7 @@ func cancelRunningJob(_ *sess.Session, job *model.Job,
}

// pauseRunningJob check and pause the running Job
func pauseRunningJob(_ *sess.Session, job *model.Job,
func pauseRunningJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
if job.IsPausing() || job.IsPaused() {
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(job.ID)
Expand All @@ -1713,7 +1713,7 @@ func pauseRunningJob(_ *sess.Session, job *model.Job,
}

// resumePausedJob check and resume the Paused Job
func resumePausedJob(_ *sess.Session, job *model.Job,
func resumePausedJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
if !job.IsResumable() {
errMsg := fmt.Sprintf("job has not been paused, job state:%s, schema state:%s",
Expand All @@ -1733,7 +1733,13 @@ func resumePausedJob(_ *sess.Session, job *model.Job,
}

// processJobs command on the Job according to the process
<<<<<<< HEAD
func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
=======
func processJobs(
ctx context.Context,
process func(*model.Job, model.AdminCommandOperator) (err error),
>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145))
sessCtx sessionctx.Context,
ids []int64,
byWho model.AdminCommandOperator) (jobErrs []error, err error) {
Expand Down Expand Up @@ -1779,7 +1785,7 @@ func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOpera
}
delete(jobMap, job.ID)

err = process(ns, job, byWho)
err = process(job, byWho)
if err != nil {
jobErrs[i] = err
continue
Expand Down Expand Up @@ -1844,8 +1850,17 @@ func ResumeJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err e
}

// pprocessAllJobs processes all the jobs in the job table, 100 jobs at a time in case of high memory usage.
<<<<<<< HEAD
func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
se sessionctx.Context, byWho model.AdminCommandOperator) (map[int64]error, error) {
=======
func processAllJobs(
ctx context.Context,
process func(*model.Job, model.AdminCommandOperator) (err error),
se sessionctx.Context,
byWho model.AdminCommandOperator,
) (map[int64]error, error) {
>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145))
var err error
var jobErrs = make(map[int64]error)

Expand All @@ -1870,7 +1885,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp
}

for _, job := range jobs {
err = process(ns, job, byWho)
err = process(job, byWho)
if err != nil {
jobErrs[job.ID] = err
continue
Expand Down
30 changes: 30 additions & 0 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,40 @@ func genConfig(
memRoot MemRoot,
unique bool,
resourceGroup string,
<<<<<<< HEAD
) (*litConfig, error) {
tidbCfg := tidb.GetGlobalConfig()
cfg := lightning.NewConfig()
cfg.TikvImporter.Backend = lightning.BackendLocal
=======
concurrency int,
) (*local.BackendConfig, error) {
cfg := &local.BackendConfig{
LocalStoreDir: jobSortPath,
ResourceGroupName: resourceGroup,
MaxConnPerStore: concurrency,
WorkerConcurrency: concurrency * 2,
KeyspaceName: tidb.GetGlobalKeyspaceName(),
// We disable the switch TiKV mode feature for now, because the impact is not
// fully tested.
ShouldCheckWriteStall: true,

// lighting default values
CheckpointEnabled: true,
BlockSize: lightning.DefaultBlockSize,
KVWriteBatchSize: lightning.KVWriteBatchSize,
RegionSplitBatchSize: lightning.DefaultRegionSplitBatchSize,
RegionSplitConcurrency: runtime.GOMAXPROCS(0),
MemTableSize: lightning.DefaultEngineMemCacheSize,
LocalWriterMemCacheSize: lightning.DefaultLocalWriterMemCacheSize,
ShouldCheckTiKV: true,
MaxOpenFiles: int(litRLimit),
PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable,
TaskType: kvutil.ExplicitTypeDDL,
DisableAutomaticCompactions: true,
StoreWriteBWLimit: int(variable.DDLReorgMaxWriteSpeed.Load()),
}
>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145))
// Each backend will build a single dir in lightning dir.
cfg.TikvImporter.SortedKVDir = jobSortPath
if ImporterRangeConcurrencyForTest != nil {
Expand Down
Loading

0 comments on commit 503d49e

Please sign in to comment.