-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ddl: pessimistic lock global id, alloc id & insert ddl job in one txn #54547
Changes from all commits
e031e76
eed5c81
9031205
dc5f40a
4de6ccd
2ad8272
4a03cdb
940c1e1
1f8d67b
22f7bac
9ed43ad
20f3b2d
e5bd14b
db08387
74eb0c1
6b471da
96ebfe6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -17,6 +17,7 @@ package ddl | |||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||||||
"fmt" | ||||||||||||||||||||||||||||||||
"math" | ||||||||||||||||||||||||||||||||
"math/rand" | ||||||||||||||||||||||||||||||||
"os" | ||||||||||||||||||||||||||||||||
"strconv" | ||||||||||||||||||||||||||||||||
|
@@ -48,6 +49,8 @@ import ( | |||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/pkg/util/resourcegrouptag" | ||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/pkg/util/topsql" | ||||||||||||||||||||||||||||||||
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state" | ||||||||||||||||||||||||||||||||
tikv "github.com/tikv/client-go/v2/kv" | ||||||||||||||||||||||||||||||||
"github.com/tikv/client-go/v2/oracle" | ||||||||||||||||||||||||||||||||
"github.com/tikv/client-go/v2/tikvrpc" | ||||||||||||||||||||||||||||||||
kvutil "github.com/tikv/client-go/v2/util" | ||||||||||||||||||||||||||||||||
clientv3 "go.etcd.io/etcd/client/v3" | ||||||||||||||||||||||||||||||||
|
@@ -267,6 +270,9 @@ func (d *ddl) addBatchDDLJobsV1(tasks []*limitJobTask) { | |||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// addBatchLocalDDLJobs gets global job IDs and delivery the DDL jobs to local TiDB | ||||||||||||||||||||||||||||||||
func (d *ddl) addBatchLocalDDLJobs(tasks []*limitJobTask) { | ||||||||||||||||||||||||||||||||
if newTasks, err := combineBatchCreateTableJobs(tasks); err == nil { | ||||||||||||||||||||||||||||||||
tasks = newTasks | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
err := d.addBatchDDLJobs(tasks) | ||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||
for _, task := range tasks { | ||||||||||||||||||||||||||||||||
|
@@ -395,7 +401,6 @@ func setJobStateToQueueing(job *model.Job) { | |||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// addBatchDDLJobs gets global job IDs and puts the DDL jobs in the DDL job table or local worker. | ||||||||||||||||||||||||||||||||
func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error { | ||||||||||||||||||||||||||||||||
var ids []int64 | ||||||||||||||||||||||||||||||||
var err error | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if len(tasks) == 0 { | ||||||||||||||||||||||||||||||||
|
@@ -407,11 +412,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error { | |||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
defer d.sessPool.Put(se) | ||||||||||||||||||||||||||||||||
jobs, err := getJobsBySQL(sess.NewSession(se), JobTable, fmt.Sprintf("type = %d", model.ActionFlashbackCluster)) | ||||||||||||||||||||||||||||||||
flashClusterJobs, err := getJobsBySQL(sess.NewSession(se), JobTable, fmt.Sprintf("type = %d", model.ActionFlashbackCluster)) | ||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
if len(jobs) != 0 { | ||||||||||||||||||||||||||||||||
if len(flashClusterJobs) != 0 { | ||||||||||||||||||||||||||||||||
return errors.Errorf("Can't add ddl job, have flashback cluster job") | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
|
@@ -420,25 +425,14 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error { | |||||||||||||||||||||||||||||||
bdrRole = string(ast.BDRRoleNone) | ||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if newTasks, err := combineBatchCreateTableJobs(tasks); err == nil { | ||||||||||||||||||||||||||||||||
tasks = newTasks | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) | ||||||||||||||||||||||||||||||||
// lock to reduce conflict | ||||||||||||||||||||||||||||||||
d.globalIDLock.Lock() | ||||||||||||||||||||||||||||||||
err = kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error { | ||||||||||||||||||||||||||||||||
t := meta.NewMeta(txn) | ||||||||||||||||||||||||||||||||
ids, err = t.GenGlobalIDs(len(tasks)) | ||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
bdrRole, err = t.GetBDRRole() | ||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
startTS = txn.StartTS() | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// for localmode, we still need to check this variable if upgrading below v6.2. | ||||||||||||||||||||||||||||||||
|
@@ -450,17 +444,15 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error { | |||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||
d.globalIDLock.Unlock() | ||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
jobTasks := make([]*model.Job, 0, len(tasks)) | ||||||||||||||||||||||||||||||||
for i, task := range tasks { | ||||||||||||||||||||||||||||||||
jobs := make([]*model.Job, 0, len(tasks)) | ||||||||||||||||||||||||||||||||
for _, task := range tasks { | ||||||||||||||||||||||||||||||||
job := task.job | ||||||||||||||||||||||||||||||||
job.Version = currentVersion | ||||||||||||||||||||||||||||||||
job.StartTS = startTS | ||||||||||||||||||||||||||||||||
job.ID = ids[i] | ||||||||||||||||||||||||||||||||
job.BDRRole = bdrRole | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// BDR mode only affects the DDL not from CDC | ||||||||||||||||||||||||||||||||
|
@@ -492,28 +484,155 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error { | |||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
jobTasks = append(jobTasks, job) | ||||||||||||||||||||||||||||||||
jobs = append(jobs, job) | ||||||||||||||||||||||||||||||||
injectModifyJobArgFailPoint(job) | ||||||||||||||||||||||||||||||||
if !job.LocalMode { | ||||||||||||||||||||||||||||||||
d.initJobDoneCh(job.ID) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if tasks[0].job.LocalMode { | ||||||||||||||||||||||||||||||||
ddlSe := sess.NewSession(se) | ||||||||||||||||||||||||||||||||
localMode := tasks[0].job.LocalMode | ||||||||||||||||||||||||||||||||
if localMode { | ||||||||||||||||||||||||||||||||
if err = fillJobIDs(ctx, ddlSe, tasks); err != nil { | ||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
for _, task := range tasks { | ||||||||||||||||||||||||||||||||
d.localJobCh <- task | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
return errors.Trace(insertDDLJobs2Table(sess.NewSession(se), true, jobTasks...)) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if err = GenIDAndInsertJobsWithRetry(ctx, ddlSe, jobs); err != nil { | ||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
for _, job := range jobs { | ||||||||||||||||||||||||||||||||
d.initJobDoneCh(job.ID) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// GenIDAndInsertJobsWithRetry generate job ID and inserts DDL jobs to the DDL job | ||||||||||||||||||||||||||||||||
// table with retry. job id allocation and job insertion are in the same transaction, | ||||||||||||||||||||||||||||||||
// as we want to make sure DDL jobs are inserted in id order, then we can query from | ||||||||||||||||||||||||||||||||
// a min job ID when scheduling DDL jobs to mitigate https://github.com/pingcap/tidb/issues/52905. | ||||||||||||||||||||||||||||||||
// so this function has side effect, it will set the job id of 'jobs'. | ||||||||||||||||||||||||||||||||
func GenIDAndInsertJobsWithRetry(ctx context.Context, ddlSe *sess.Session, jobs []*model.Job) error { | ||||||||||||||||||||||||||||||||
return genIDAndCallWithRetry(ctx, ddlSe, len(jobs), func(ids []int64) error { | ||||||||||||||||||||||||||||||||
for idx := range jobs { | ||||||||||||||||||||||||||||||||
jobs[idx].ID = ids[idx] | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
return insertDDLJobs2Table(ctx, ddlSe, jobs...) | ||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
func fillJobIDs(ctx context.Context, ddlSe *sess.Session, tasks []*limitJobTask) error { | ||||||||||||||||||||||||||||||||
var allocatedIDs []int64 | ||||||||||||||||||||||||||||||||
if err := genIDAndCallWithRetry(ctx, ddlSe, len(tasks), func(ids []int64) error { | ||||||||||||||||||||||||||||||||
allocatedIDs = ids | ||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||
}); err != nil { | ||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
for i, task := range tasks { | ||||||||||||||||||||||||||||||||
task.job.ID = allocatedIDs[i] | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// genIDAndCallWithRetry generates global IDs and calls the function with retry. | ||||||||||||||||||||||||||||||||
// generate ID and call function runs in the same transaction. | ||||||||||||||||||||||||||||||||
func genIDAndCallWithRetry(ctx context.Context, ddlSe *sess.Session, count int, fn func(ids []int64) error) error { | ||||||||||||||||||||||||||||||||
var resErr error | ||||||||||||||||||||||||||||||||
for i := uint(0); i < kv.MaxRetryCnt; i++ { | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not that much transaction retry is needed, as the conflict error is already handled by the in-transaction statements or operations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's reusing retry count of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||||||||||||||||||||||||||||||||
resErr = func() (err error) { | ||||||||||||||||||||||||||||||||
if err := ddlSe.Begin(ctx); err != nil { | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
what do you mean, we don't have transaction before this one
it's the same behavior as our previous impl. if such job submitted, job scheduler will stop it from running if there is a flashback job before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i ever thought remove this checking, as job schedule will calculate dependency between jobs, but a little different with current behavior There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
OK.
I guess the purpose of checking flashback job before submitting is to prevent having potential wrong info in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. previously we cannot make sure there is no flashback job before this job, they might be submitted on different node now we can do it by check inside the submit transaction, but i don't want to change this part now, and it might hurt performance |
||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
defer func() { | ||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it's better to abstract L549 to L576 to a standalone function, using defer statement inside for loop increases the risks of mistakes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's inside an lambda function already, the defer will run after the it returns. |
||||||||||||||||||||||||||||||||
ddlSe.Rollback() | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||
txn, err := ddlSe.Txn() | ||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
txn.SetOption(kv.Pessimistic, true) | ||||||||||||||||||||||||||||||||
forUpdateTS, err := lockGlobalIDKey(ctx, ddlSe, txn) | ||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
txn.GetSnapshot().SetOption(kv.SnapshotTS, forUpdateTS) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
m := meta.NewMeta(txn) | ||||||||||||||||||||||||||||||||
ids, err := m.GenGlobalIDs(count) | ||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
if err = fn(ids); err != nil { | ||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
return ddlSe.Commit(ctx) | ||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if resErr != nil && kv.IsTxnRetryableError(resErr) { | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure what's the meaning of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see https://github.com/tikv/client-go/blob/d73cc1ed6503925dfc7226e8d5677ceb4c2fd6f1/txnkv/transaction/2pc.go#L1227-L1230, and handled in tidb with below, seems no special handling Lines 1029 to 1040 in 06e0e17
i guess the reason is: T1 lock it so no one can change it before forUpdateTS, after the lock is expired & it hasn't commit, if there is T2 lock it later, T1 commit > T2 forUpdateTS > T1 forUpdateTS, one of them will report There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found this type of error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Lines 83 to 85 in 9044acb
only ErrTxnRetryable except write conflict
yes, that's why i ask @lcwangchao @cfzjywxk to review this part There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. default There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems we cannot avoid this on crash for pessimistic txn now
|
||||||||||||||||||||||||||||||||
logutil.DDLLogger().Warn("insert job meet retryable error", zap.Error(resErr)) | ||||||||||||||||||||||||||||||||
kv.BackOff(i) | ||||||||||||||||||||||||||||||||
continue | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
break | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
return resErr | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// lockGlobalIDKey locks the global ID key in the meta store. it keeps trying if | ||||||||||||||||||||||||||||||||
// meet write conflict, we cannot have a fixed retry count for this error, see this | ||||||||||||||||||||||||||||||||
// https://github.com/pingcap/tidb/issues/27197#issuecomment-2216315057. | ||||||||||||||||||||||||||||||||
// this part is same as how we implement pessimistic + repeatable read isolation | ||||||||||||||||||||||||||||||||
// level in SQL executor, see doLockKeys. | ||||||||||||||||||||||||||||||||
// NextGlobalID is a meta key, so we cannot use "select xx for update", if we store | ||||||||||||||||||||||||||||||||
// it into a table row or using advisory lock, we will depends on a system table | ||||||||||||||||||||||||||||||||
// that is created by us, cyclic. although we can create a system table without using | ||||||||||||||||||||||||||||||||
// DDL logic, we will only consider change it when we have data dictionary and keep | ||||||||||||||||||||||||||||||||
// it this way now. | ||||||||||||||||||||||||||||||||
// TODO maybe we can unify the lock mechanism with SQL executor in the future, or | ||||||||||||||||||||||||||||||||
cfzjywxk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||
// implement it inside TiKV client-go. | ||||||||||||||||||||||||||||||||
func lockGlobalIDKey(ctx context.Context, ddlSe *sess.Session, txn kv.Transaction) (uint64, error) { | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not a big problem. We can get There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will keep it, don't want to get it again, this method is internal anyway. |
||||||||||||||||||||||||||||||||
var ( | ||||||||||||||||||||||||||||||||
iteration uint | ||||||||||||||||||||||||||||||||
forUpdateTs = txn.StartTS() | ||||||||||||||||||||||||||||||||
ver kv.Version | ||||||||||||||||||||||||||||||||
err error | ||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||
waitTime := ddlSe.GetSessionVars().LockWaitTimeout | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this session (ddlSe) is get from session pool, not user's session. Maybe get the lock wait value from user's session. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is part of internal DDL execution, internal not user setting should be used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if it's more reasonable that the setting of DDL's SQL connection can affect the timeout of DDL. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems only affects txn for DML in mysql, mysql don't have a ddl_job table like us, so it shouldn't affect DDL, a simple memory lock should be enough |
||||||||||||||||||||||||||||||||
m := meta.NewMeta(txn) | ||||||||||||||||||||||||||||||||
idKey := m.GlobalIDKey() | ||||||||||||||||||||||||||||||||
for { | ||||||||||||||||||||||||||||||||
lockCtx := tikv.NewLockCtx(forUpdateTs, waitTime, time.Now()) | ||||||||||||||||||||||||||||||||
err = txn.LockKeys(ctx, lockCtx, idKey) | ||||||||||||||||||||||||||||||||
if err == nil || !terror.ErrorEqual(kv.ErrWriteConflict, err) { | ||||||||||||||||||||||||||||||||
cfzjywxk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||
break | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
// ErrWriteConflict contains a conflict-commit-ts in most case, but it cannot | ||||||||||||||||||||||||||||||||
// be used as forUpdateTs, see comments inside handleAfterPessimisticLockError | ||||||||||||||||||||||||||||||||
ver, err = ddlSe.GetStore().CurrentVersion(oracle.GlobalTxnScope) | ||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||
break | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
forUpdateTs = ver.Ver | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
kv.BackOff(iteration) | ||||||||||||||||||||||||||||||||
// avoid it keep growing and overflow. | ||||||||||||||||||||||||||||||||
iteration = min(iteration+1, math.MaxInt) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
return forUpdateTs, err | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// combineBatchCreateTableJobs combine batch jobs to another batch jobs. | ||||||||||||||||||||||||||||||||
// currently it only support combine CreateTable to CreateTables. | ||||||||||||||||||||||||||||||||
func combineBatchCreateTableJobs(tasks []*limitJobTask) ([]*limitJobTask, error) { | ||||||||||||||||||||||||||||||||
if len(tasks) <= 1 || !tasks[0].job.LocalMode { | ||||||||||||||||||||||||||||||||
if len(tasks) <= 1 { | ||||||||||||||||||||||||||||||||
return tasks, nil | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
var schemaName string | ||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
local mode jobs are not written to ddl_job table, no need to lock the global ID? I'm afraid the performance for local job is more important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
memory lock is used to reduce write conflict, after later pr to "combine table id allocation with job id", there will be no write conflict in 1 node.
with the 2 pr mentioned in the pr, we can create 100k tables in about 13 minutes, and there is very little speed degradation, i will test 1M tables later, if everything goes ok, i still suggest deprecate fast-create in later version