Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: pessimistic lock global id, alloc id & insert ddl job in one txn #54547

Merged
merged 17 commits into from
Jul 16, 2024
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ go_test(
"ddl_test.go",
"ddl_worker_test.go",
"ddl_workerpool_test.go",
"executor_test.go",
"export_test.go",
"fail_test.go",
"foreign_key_test.go",
Expand Down
171 changes: 145 additions & 26 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ddl
import (
"context"
"fmt"
"math"
"math/rand"
"os"
"strconv"
Expand Down Expand Up @@ -48,6 +49,8 @@ import (
"github.com/pingcap/tidb/pkg/util/resourcegrouptag"
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
tikv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
kvutil "github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -267,6 +270,9 @@ func (d *ddl) addBatchDDLJobsV1(tasks []*limitJobTask) {

// addBatchLocalDDLJobs gets global job IDs and delivery the DDL jobs to local TiDB
func (d *ddl) addBatchLocalDDLJobs(tasks []*limitJobTask) {
if newTasks, err := combineBatchCreateTableJobs(tasks); err == nil {
tasks = newTasks
}
err := d.addBatchDDLJobs(tasks)
if err != nil {
for _, task := range tasks {
Expand Down Expand Up @@ -395,7 +401,6 @@ func setJobStateToQueueing(job *model.Job) {

// addBatchDDLJobs gets global job IDs and puts the DDL jobs in the DDL job table or local worker.
func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {
var ids []int64
var err error

if len(tasks) == 0 {
Expand All @@ -407,11 +412,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {
return errors.Trace(err)
}
defer d.sessPool.Put(se)
jobs, err := getJobsBySQL(sess.NewSession(se), JobTable, fmt.Sprintf("type = %d", model.ActionFlashbackCluster))
flashClusterJobs, err := getJobsBySQL(sess.NewSession(se), JobTable, fmt.Sprintf("type = %d", model.ActionFlashbackCluster))
if err != nil {
return errors.Trace(err)
}
if len(jobs) != 0 {
if len(flashClusterJobs) != 0 {
return errors.Errorf("Can't add ddl job, have flashback cluster job")
}

Expand All @@ -420,25 +425,14 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {
bdrRole = string(ast.BDRRoleNone)
)

if newTasks, err := combineBatchCreateTableJobs(tasks); err == nil {
tasks = newTasks
}

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
// lock to reduce conflict
d.globalIDLock.Lock()
err = kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
ids, err = t.GenGlobalIDs(len(tasks))
if err != nil {
return errors.Trace(err)
}

bdrRole, err = t.GetBDRRole()
if err != nil {
return errors.Trace(err)
}

startTS = txn.StartTS()

// for localmode, we still need to check this variable if upgrading below v6.2.
Expand All @@ -450,17 +444,15 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {

return nil
})
d.globalIDLock.Unlock()
if err != nil {
return errors.Trace(err)
}

jobTasks := make([]*model.Job, 0, len(tasks))
for i, task := range tasks {
jobs := make([]*model.Job, 0, len(tasks))
for _, task := range tasks {
job := task.job
job.Version = currentVersion
job.StartTS = startTS
job.ID = ids[i]
job.BDRRole = bdrRole

// BDR mode only affects the DDL not from CDC
Expand Down Expand Up @@ -492,28 +484,155 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {
return err
}

jobTasks = append(jobTasks, job)
jobs = append(jobs, job)
injectModifyJobArgFailPoint(job)
if !job.LocalMode {
d.initJobDoneCh(job.ID)
}
}

se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)

if tasks[0].job.LocalMode {
ddlSe := sess.NewSession(se)
localMode := tasks[0].job.LocalMode
if localMode {
if err = fillJobIDs(ctx, ddlSe, tasks); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local mode jobs are not written to ddl_job table, no need to lock the global ID? I'm afraid the performance for local job is more important.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to lock the global ID?

memory lock is used to reduce write conflict, after later pr to "combine table id allocation with job id", there will be no write conflict in 1 node.

with the 2 pr mentioned in the pr, we can create 100k tables in about 13 minutes, and there is very little speed degradation, i will test 1M tables later, if everything goes ok, i still suggest deprecate fast-create in later version
image

return err
}
for _, task := range tasks {
d.localJobCh <- task
}
return nil
}
return errors.Trace(insertDDLJobs2Table(sess.NewSession(se), true, jobTasks...))

if err = GenIDAndInsertJobsWithRetry(ctx, ddlSe, jobs); err != nil {
return errors.Trace(err)
}
for _, job := range jobs {
d.initJobDoneCh(job.ID)
}

return nil
}

// GenIDAndInsertJobsWithRetry generate job ID and inserts DDL jobs to the DDL job
// table with retry. job id allocation and job insertion are in the same transaction,
// as we want to make sure DDL jobs are inserted in id order, then we can query from
// a min job ID when scheduling DDL jobs to mitigate https://github.com/pingcap/tidb/issues/52905.
// so this function has side effect, it will set the job id of 'jobs'.
func GenIDAndInsertJobsWithRetry(ctx context.Context, ddlSe *sess.Session, jobs []*model.Job) error {
return genIDAndCallWithRetry(ctx, ddlSe, len(jobs), func(ids []int64) error {
for idx := range jobs {
jobs[idx].ID = ids[idx]
}
return insertDDLJobs2Table(ctx, ddlSe, jobs...)
})
}

func fillJobIDs(ctx context.Context, ddlSe *sess.Session, tasks []*limitJobTask) error {
var allocatedIDs []int64
if err := genIDAndCallWithRetry(ctx, ddlSe, len(tasks), func(ids []int64) error {
allocatedIDs = ids
return nil
}); err != nil {
return errors.Trace(err)
}

for i, task := range tasks {
task.job.ID = allocatedIDs[i]
}
return nil
}

// genIDAndCallWithRetry generates global IDs and calls the function with retry.
// generate ID and call function runs in the same transaction.
func genIDAndCallWithRetry(ctx context.Context, ddlSe *sess.Session, count int, fn func(ids []int64) error) error {
var resErr error
for i := uint(0); i < kv.MaxRetryCnt; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that much transaction retry is needed, as the conflict error is already handled by the in-transaction statements or operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's reusing retry count of kv.RunInNewTxn, in case of other retryable error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kv.RunInNewTxn mode is executed in optimistic mode, the write conflict error needs to be handled doing commit. The transaction commit should succeed in most cases when pessimistic lock is used. Not a big problem to try more times.

resErr = func() (err error) {
if err := ddlSe.Begin(ctx); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we Begin() here, previous statements executed by ddlSe will be committed implicitly. For example, the flashback jobs checking maybe outdated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we Begin() here, previous statements executed by ddlSe will be committed implicitly.

what do you mean, we don't have transaction before this one

flashback jobs checking maybe outdated

it's the same behavior as our previous impl. if such job submitted, job scheduler will stop it from running if there is a flashback job before

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i ever thought remove this checking, as job schedule will calculate dependency between jobs, but a little different with current behavior

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's the same behavior as our previous impl.

OK.

job scheduler will stop it from running

I guess the purpose of checking flashback job before submitting is to prevent having potential wrong info in job.Args.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously we cannot make sure there is no flashback job before this job, they might be submitted on different node

now we can do it by check inside the submit transaction, but i don't want to change this part now, and it might hurt performance

return errors.Trace(err)
}
defer func() {
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's better to abstract L549 to L576 to a standalone function, using defer statement inside for loop increases the risks of mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's inside an lambda function already, the defer will run after the it returns.

ddlSe.Rollback()
}
}()
txn, err := ddlSe.Txn()
if err != nil {
return errors.Trace(err)
}
txn.SetOption(kv.Pessimistic, true)
forUpdateTS, err := lockGlobalIDKey(ctx, ddlSe, txn)
if err != nil {
return errors.Trace(err)
}
txn.GetSnapshot().SetOption(kv.SnapshotTS, forUpdateTS)

m := meta.NewMeta(txn)
ids, err := m.GenGlobalIDs(count)
if err != nil {
return errors.Trace(err)
}
if err = fn(ids); err != nil {
return errors.Trace(err)
}
return ddlSe.Commit(ctx)
}()

if resErr != nil && kv.IsTxnRetryableError(resErr) {
Copy link
Contributor

@lance6716 lance6716 Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what's the meaning of ErrLockExpire. I just want to cover the case that pessimistic lock is cleaned by other readers of GlobalIDKey and it can be retryable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see https://github.com/tikv/client-go/blob/d73cc1ed6503925dfc7226e8d5677ceb4c2fd6f1/txnkv/transaction/2pc.go#L1227-L1230, and handled in tidb with below, seems no special handling

tidb/pkg/session/session.go

Lines 1029 to 1040 in 06e0e17

func (s *session) checkTxnAborted(stmt sqlexec.Statement) error {
if atomic.LoadUint32(&s.GetSessionVars().TxnCtx.LockExpire) == 0 {
return nil
}
// If the transaction is aborted, the following statements do not need to execute, except `commit` and `rollback`,
// because they are used to finish the aborted transaction.
if ok, err := isEndTxnStmt(stmt.(*executor.ExecStmt).StmtNode, s.sessionVars); err == nil && ok {
return nil
} else if err != nil {
return err
}
return kv.ErrLockExpire

i guess the reason is: T1 lock it so no one can change it before forUpdateTS, after the lock is expired & it hasn't commit, if there is T2 lock it later, T1 commit > T2 forUpdateTS > T1 forUpdateTS, one of them will report write conflict on commit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this type of error ERROR 1105 (HY000): tikv aborts txn: Error(Txn(Error(Mvcc(Error(PessimisticLockNotFound which likely standing for the case that pessimistic lock is cleaned by other transactions. But I don't know if it's handled by kv.IsTxnRetryableError. Transaction is very complex 😂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I don't know if it's handled by kv.IsTxnRetryableError

tidb/pkg/kv/error.go

Lines 83 to 85 in 9044acb

if ErrTxnRetryable.Equal(err) || ErrWriteConflict.Equal(err) || ErrWriteConflictInTiDB.Equal(err) {
return true
}

only ErrTxnRetryable except write conflict

Transaction is very complex 😂

yes, that's why i ask @lcwangchao @cfzjywxk to review this part

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default MaxTxnTTL: 60 * 60 * 1000, // 1hour, seems ok to let user retry it themself in this case, even DML transaction will abort

Copy link
Contributor

@lance6716 lance6716 Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see ManagedLockTTL in client-go is 20s. Does it means if pessimistic lock left in TiKV while the node crashes, other node must wait at most 20s to clean up the lock? If lock owner crashes, submitting DDL will be paused for 20s, it's not friendly.

Copy link
Contributor Author

@D3Hunter D3Hunter Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we cannot avoid this on crash for pessimistic txn now

  • create table t(id int primary key, v int); insert into t values(1,1);
  • run this
mysql -uroot -h 127.0.0.1 -P4000 test -e "select now(); begin; update t set v=2 where id=1; select sleep(100);";
+---------------------+
| now()               |
+---------------------+
| 2024-07-15 22:26:12 |
+---------------------+
ERROR 2013 (HY000) at line 1: Lost connection to MySQL server during query
  • kill -9 tidb, immediately after previous step
  • restart
  • run immediately after resart
mysql -uroot -h 127.0.0.1 -P4000 test -e "select now(); begin; update t set v=3 where id=1; commit; select now();"
+---------------------+
| now()               |
+---------------------+
| 2024-07-15 22:26:16 |
+---------------------+
+---------------------+
| now()               |
+---------------------+
| 2024-07-15 22:26:32 |
+---------------------+

logutil.DDLLogger().Warn("insert job meet retryable error", zap.Error(resErr))
kv.BackOff(i)
continue
}
break
}
return resErr
}

// lockGlobalIDKey locks the global ID key in the meta store. it keeps trying if
// meet write conflict, we cannot have a fixed retry count for this error, see this
// https://github.com/pingcap/tidb/issues/27197#issuecomment-2216315057.
// this part is same as how we implement pessimistic + repeatable read isolation
// level in SQL executor, see doLockKeys.
// NextGlobalID is a meta key, so we cannot use "select xx for update", if we store
// it into a table row or using advisory lock, we will depends on a system table
// that is created by us, cyclic. although we can create a system table without using
// DDL logic, we will only consider change it when we have data dictionary and keep
// it this way now.
// TODO maybe we can unify the lock mechanism with SQL executor in the future, or
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
// implement it inside TiKV client-go.
func lockGlobalIDKey(ctx context.Context, ddlSe *sess.Session, txn kv.Transaction) (uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big problem. We can get txn from ddlSe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will keep it, don't want to get it again, this method is internal anyway.

var (
iteration uint
forUpdateTs = txn.StartTS()
ver kv.Version
err error
)
waitTime := ddlSe.GetSessionVars().LockWaitTimeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this session (ddlSe) is get from session pool, not user's session. Maybe get the lock wait value from user's session.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is part of internal DDL execution, internal not user setting should be used

Copy link
Contributor

@lance6716 lance6716 Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's more reasonable that the setting of DDL's SQL connection can affect the timeout of DDL.

Copy link
Contributor Author

@D3Hunter D3Hunter Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems only affects txn for DML in mysql, mysql don't have a ddl_job table like us, so it shouldn't affect DDL, a simple memory lock should be enough
https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout

m := meta.NewMeta(txn)
idKey := m.GlobalIDKey()
for {
lockCtx := tikv.NewLockCtx(forUpdateTs, waitTime, time.Now())
err = txn.LockKeys(ctx, lockCtx, idKey)
if err == nil || !terror.ErrorEqual(kv.ErrWriteConflict, err) {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
break
}
// ErrWriteConflict contains a conflict-commit-ts in most case, but it cannot
// be used as forUpdateTs, see comments inside handleAfterPessimisticLockError
ver, err = ddlSe.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
break
}
forUpdateTs = ver.Ver

kv.BackOff(iteration)
// avoid it keep growing and overflow.
iteration = min(iteration+1, math.MaxInt)
}
return forUpdateTs, err
}

// combineBatchCreateTableJobs combine batch jobs to another batch jobs.
// currently it only support combine CreateTable to CreateTables.
func combineBatchCreateTableJobs(tasks []*limitJobTask) ([]*limitJobTask, error) {
if len(tasks) <= 1 || !tasks[0].job.LocalMode {
if len(tasks) <= 1 {
return tasks, nil
}
var schemaName string
Expand Down
Loading