Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#57145
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CbcWestwolf authored and ti-chi-bot committed Nov 14, 2024
1 parent 9d5e282 commit 9a62d7f
Show file tree
Hide file tree
Showing 8 changed files with 1,062 additions and 10 deletions.
25 changes: 20 additions & 5 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@ func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) {
}

// cancelRunningJob cancel a DDL job that is in the concurrent state.
func cancelRunningJob(_ *sess.Session, job *model.Job,
func cancelRunningJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
// These states can't be cancelled.
if job.IsDone() || job.IsSynced() {
Expand All @@ -1479,7 +1479,7 @@ func cancelRunningJob(_ *sess.Session, job *model.Job,
}

// pauseRunningJob check and pause the running Job
func pauseRunningJob(_ *sess.Session, job *model.Job,
func pauseRunningJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
if job.IsPausing() || job.IsPaused() {
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(job.ID)
Expand All @@ -1498,7 +1498,7 @@ func pauseRunningJob(_ *sess.Session, job *model.Job,
}

// resumePausedJob check and resume the Paused Job
func resumePausedJob(_ *sess.Session, job *model.Job,
func resumePausedJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
if !job.IsResumable() {
errMsg := fmt.Sprintf("job has not been paused, job state:%s, schema state:%s",
Expand All @@ -1518,7 +1518,13 @@ func resumePausedJob(_ *sess.Session, job *model.Job,
}

// processJobs command on the Job according to the process
<<<<<<< HEAD
func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
=======
func processJobs(
ctx context.Context,
process func(*model.Job, model.AdminCommandOperator) (err error),
>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145))
sessCtx sessionctx.Context,
ids []int64,
byWho model.AdminCommandOperator) (jobErrs []error, err error) {
Expand Down Expand Up @@ -1564,7 +1570,7 @@ func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOpera
}
delete(jobMap, job.ID)

err = process(ns, job, byWho)
err = process(job, byWho)
if err != nil {
jobErrs[i] = err
continue
Expand Down Expand Up @@ -1629,8 +1635,17 @@ func ResumeJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err e
}

// pprocessAllJobs processes all the jobs in the job table, 100 jobs at a time in case of high memory usage.
<<<<<<< HEAD
func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
se sessionctx.Context, byWho model.AdminCommandOperator) (map[int64]error, error) {
=======
func processAllJobs(
ctx context.Context,
process func(*model.Job, model.AdminCommandOperator) (err error),
se sessionctx.Context,
byWho model.AdminCommandOperator,
) (map[int64]error, error) {
>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145))
var err error
var jobErrs = make(map[int64]error)

Expand All @@ -1655,7 +1670,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp
}

for _, job := range jobs {
err = process(ns, job, byWho)
err = process(job, byWho)
if err != nil {
jobErrs[job.ID] = err
continue
Expand Down
18 changes: 18 additions & 0 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,28 @@ type Config struct {
IsRaftKV2 bool
}

<<<<<<< HEAD
func genConfig(ctx context.Context, memRoot MemRoot, jobID int64, unique bool) (*Config, error) {
tidbCfg := tidb.GetGlobalConfig()
cfg := lightning.NewConfig()
cfg.TikvImporter.Backend = lightning.BackendLocal
=======
// lighting default values
CheckpointEnabled: true,
BlockSize: lightning.DefaultBlockSize,
KVWriteBatchSize: lightning.KVWriteBatchSize,
RegionSplitBatchSize: lightning.DefaultRegionSplitBatchSize,
RegionSplitConcurrency: runtime.GOMAXPROCS(0),
MemTableSize: lightning.DefaultEngineMemCacheSize,
LocalWriterMemCacheSize: lightning.DefaultLocalWriterMemCacheSize,
ShouldCheckTiKV: true,
MaxOpenFiles: int(litRLimit),
PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable,
TaskType: kvutil.ExplicitTypeDDL,
DisableAutomaticCompactions: true,
StoreWriteBWLimit: int(variable.DDLReorgMaxWriteSpeed.Load()),
}
>>>>>>> 50dcee7cd51 (ddl: introduce a new system variable to control the `store-write-bwlimit` when ingesting (#57145))
// Each backend will build a single dir in lightning dir.
cfg.TikvImporter.SortedKVDir = filepath.Join(LitSortPath, EncodeBackendTag(jobID))
if ImporterRangeConcurrencyForTest != nil {
Expand Down
Loading

0 comments on commit 9a62d7f

Please sign in to comment.