Skip to content

Commit

Permalink
txn: Optimize pessimistic transaction by supporting locking with conf…
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored Jan 13, 2023
1 parent 9a917c8 commit b1ecabb
Show file tree
Hide file tree
Showing 34 changed files with 905 additions and 132 deletions.
2 changes: 1 addition & 1 deletion ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ func TestAddColumn2(t *testing.T) {
require.NoError(t, err)
_, err = writeOnlyTable.AddRecord(tk.Session(), types.MakeDatums(oldRow[0].GetInt64(), 2, oldRow[2].GetInt64()), table.IsUpdate)
require.NoError(t, err)
tk.Session().StmtCommit()
tk.Session().StmtCommit(ctx)
err = tk.Session().CommitTxn(ctx)
require.NoError(t, err)

Expand Down
6 changes: 3 additions & 3 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1537,7 +1537,7 @@ func (s *session) begin() error {
}

func (s *session) commit() error {
s.StmtCommit()
s.StmtCommit(context.Background())
return s.CommitTxn(context.Background())
}

Expand All @@ -1546,12 +1546,12 @@ func (s *session) txn() (kv.Transaction, error) {
}

func (s *session) rollback() {
s.StmtRollback()
s.StmtRollback(context.Background(), false)
s.RollbackTxn(context.Background())
}

func (s *session) reset() {
s.StmtRollback()
s.StmtRollback(context.Background(), false)
}

func (s *session) execute(ctx context.Context, query string, label string) ([]chunk.Row, error) {
Expand Down
2 changes: 1 addition & 1 deletion ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestDDLStatsInfo(t *testing.T) {
require.NoError(t, err)
_, err = m.AddRecord(ctx, types.MakeDatums(3, 3))
require.NoError(t, err)
ctx.StmtCommit()
ctx.StmtCommit(context.Background())
require.NoError(t, ctx.CommitTxn(context.Background()))

job := buildCreateIdxJob(dbInfo, tblInfo, true, "idx", "c1")
Expand Down
56 changes: 50 additions & 6 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ var (
totalQueryProcHistogramInternal = metrics.TotalQueryProcHistogram.WithLabelValues(metrics.LblInternal)
totalCopProcHistogramInternal = metrics.TotalCopProcHistogram.WithLabelValues(metrics.LblInternal)
totalCopWaitHistogramInternal = metrics.TotalCopWaitHistogram.WithLabelValues(metrics.LblInternal)

selectForUpdateFirstAttemptDuration = metrics.PessimisticDMLDurationByAttempt.WithLabelValues("select-for-update", "first-attempt")
selectForUpdateRetryDuration = metrics.PessimisticDMLDurationByAttempt.WithLabelValues("select-for-update", "retry")
dmlFirstAttemptDuration = metrics.PessimisticDMLDurationByAttempt.WithLabelValues("dml", "first-attempt")
dmlRetryDuration = metrics.PessimisticDMLDurationByAttempt.WithLabelValues("dml", "retry")
)

// processinfoSetter is the interface use to set current running process info.
Expand Down Expand Up @@ -593,7 +598,7 @@ func (a *ExecStmt) handleStmtForeignKeyTrigger(ctx context.Context, e Executor)
// change first.
// Since `UnionScanExec` use `SnapshotIter` and `SnapshotGetter` to read txn mem-buffer, if we don't do `StmtCommit`,
// then the fk cascade executor can't read the mem-buffer changed by the ExecStmt.
a.Ctx.StmtCommit()
a.Ctx.StmtCommit(ctx)
}
err := a.handleForeignKeyTrigger(ctx, e, 1)
if err != nil {
Expand Down Expand Up @@ -684,7 +689,7 @@ func (a *ExecStmt) handleForeignKeyCascade(ctx context.Context, fkc *FKCascadeEx
}
// Call `StmtCommit` uses to flush the fk cascade executor change into txn mem-buffer,
// then the later fk cascade executors can see the mem-buffer changes.
a.Ctx.StmtCommit()
a.Ctx.StmtCommit(ctx)
err = a.handleForeignKeyTrigger(ctx, e, depth+1)
if err != nil {
return err
Expand Down Expand Up @@ -858,15 +863,34 @@ func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e Execu
return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set")
}

txnManager := sessiontxn.GetTxnManager(a.Ctx)
err := txnManager.OnHandlePessimisticStmtStart(ctx)
if err != nil {
return nil, err
}

isFirstAttempt := true

for {
startTime := time.Now()
rs, err := a.runPessimisticSelectForUpdate(ctx, e)

if isFirstAttempt {
selectForUpdateFirstAttemptDuration.Observe(time.Since(startTime).Seconds())
isFirstAttempt = false
} else {
selectForUpdateRetryDuration.Observe(time.Since(startTime).Seconds())
}

e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
return nil, err
}
if e == nil {
return rs, nil
}

failpoint.Inject("pessimisticSelectForUpdateRetry", nil)
}
}

Expand Down Expand Up @@ -962,18 +986,39 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) (err er
err = ErrLazyUniquenessCheckFailure.GenWithStackByArgs(err.Error())
}
}()

txnManager := sessiontxn.GetTxnManager(a.Ctx)
err = txnManager.OnHandlePessimisticStmtStart(ctx)
if err != nil {
return err
}

isFirstAttempt := true

for {
startPointGetLocking := time.Now()
if !isFirstAttempt {
failpoint.Inject("pessimisticDMLRetry", nil)
}

startTime := time.Now()
_, err = a.handleNoDelayExecutor(ctx, e)
if !txn.Valid() {
return err
}

if isFirstAttempt {
dmlFirstAttemptDuration.Observe(time.Since(startTime).Seconds())
isFirstAttempt = false
} else {
dmlRetryDuration.Observe(time.Since(startTime).Seconds())
}

if err != nil {
// It is possible the DML has point get plan that locks the key.
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
if ErrDeadlock.Equal(err) {
metrics.StatementDeadlockDetectDuration.Observe(time.Since(startPointGetLocking).Seconds())
metrics.StatementDeadlockDetectDuration.Observe(time.Since(startTime).Seconds())
}
return err
}
Expand Down Expand Up @@ -1071,7 +1116,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error
return nil, err
}
// Rollback the statement change before retry it.
a.Ctx.StmtRollback()
a.Ctx.StmtRollback(ctx, true)
a.Ctx.GetSessionVars().StmtCtx.ResetForRetry()
a.Ctx.GetSessionVars().RetryInfo.ResetOffset()

Expand Down Expand Up @@ -1957,7 +2002,6 @@ func (a *ExecStmt) getSQLPlanDigest() ([]byte, []byte) {
}
return sqlDigest, planDigest
}

func convertStatusIntoString(sctx sessionctx.Context, statsLoadStatus map[model.TableItemID]string) map[string]map[string]string {
if len(statsLoadStatus) < 1 {
return nil
Expand Down
2 changes: 1 addition & 1 deletion executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (e *DeleteExec) doBatchDelete(ctx context.Context) error {
return ErrBatchInsertFail.GenWithStack("BatchDelete failed with error: %v", err)
}
e.memTracker.Consume(-int64(txn.Size()))
e.ctx.StmtCommit()
e.ctx.StmtCommit(ctx)
if err := sessiontxn.NewTxnInStmt(ctx, e.ctx); err != nil {
// We should return a special error for batch insert.
return ErrBatchInsertFail.GenWithStack("BatchDelete failed with error: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4379,7 +4379,7 @@ func TestAdminShowDDLJobs(t *testing.T) {
require.NoError(t, err)
err = meta.NewMeta(txn).AddHistoryDDLJob(job, true)
require.NoError(t, err)
tk.Session().StmtCommit()
tk.Session().StmtCommit(context.Background())

re = tk.MustQuery("admin show ddl jobs 1")
row = re.Rows()[0]
Expand Down
2 changes: 1 addition & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (e *InsertValues) doBatchInsert(ctx context.Context) error {
return ErrBatchInsertFail.GenWithStack("BatchInsert failed with error: %v", err)
}
e.memTracker.Consume(-int64(txn.Size()))
e.ctx.StmtCommit()
e.ctx.StmtCommit(ctx)
if err := sessiontxn.NewTxnInStmt(ctx, e.ctx); err != nil {
// We should return a special error for batch insert.
return ErrBatchInsertFail.GenWithStack("BatchInsert failed with error: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error
var err error
defer func() {
if err != nil {
e.Ctx.StmtRollback()
e.Ctx.StmtRollback(ctx, false)
}
}()
err = e.CheckAndInsertOneBatch(ctx, task.rows, task.cnt)
Expand All @@ -327,7 +327,7 @@ func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error
failpoint.Inject("commitOneTaskErr", func() error {
return errors.New("mock commit one task error")
})
e.Ctx.StmtCommit()
e.Ctx.StmtCommit(ctx)
// Make sure process stream routine never use invalid txn
e.txnInUse.Lock()
defer e.txnInUse.Unlock()
Expand All @@ -353,7 +353,7 @@ func (e *LoadDataInfo) CommitWork(ctx context.Context) error {
e.ForceQuit()
}
if err != nil {
e.ctx.StmtRollback()
e.ctx.StmtRollback(ctx, false)
}
}()
var tasks uint64
Expand Down
4 changes: 2 additions & 2 deletions executor/writetest/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1896,7 +1896,7 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo, t *testing.T, tk *t
}
ld.SetMessage()
require.Equal(t, tt.expectedMsg, tk.Session().LastMessage())
ctx.StmtCommit()
ctx.StmtCommit(context.Background())
txn, err := ctx.Txn(true)
require.NoError(t, err)
err = txn.Commit(context.Background())
Expand Down Expand Up @@ -2353,7 +2353,7 @@ func TestLoadDataIntoPartitionedTable(t *testing.T) {
require.NoError(t, err)
ld.SetMaxRowsInBatch(20000)
ld.SetMessage()
ctx.StmtCommit()
ctx.StmtCommit(context.Background())
txn, err := ctx.Txn(true)
require.NoError(t, err)
err = txn.Commit(context.Background())
Expand Down
6 changes: 6 additions & 0 deletions kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ func (t *mockTxn) Mem() uint64 {
return 0
}

func (t *mockTxn) StartAggressiveLocking() error { return nil }
func (t *mockTxn) RetryAggressiveLocking(_ context.Context) error { return nil }
func (t *mockTxn) CancelAggressiveLocking(_ context.Context) error { return nil }
func (t *mockTxn) DoneAggressiveLocking(_ context.Context) error { return nil }
func (t *mockTxn) IsInAggressiveLockingMode() bool { return false }

// newMockTxn new a mockTxn.
func newMockTxn() Transaction {
return &mockTxn{
Expand Down
10 changes: 10 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type LockCtx = tikvstore.LockCtx
type Transaction interface {
RetrieverMutator
AssertionProto
AggressiveLockingController
// Size returns sum of keys and values length.
Size() int
// Mem returns the memory consumption of the transaction.
Expand Down Expand Up @@ -281,6 +282,15 @@ type AssertionProto interface {
SetAssertion(key []byte, assertion ...FlagsOp) error
}

// AggressiveLockingController is the interface that defines aggressive locking related operations.
type AggressiveLockingController interface {
StartAggressiveLocking() error
RetryAggressiveLocking(ctx context.Context) error
CancelAggressiveLocking(ctx context.Context) error
DoneAggressiveLocking(ctx context.Context) error
IsInAggressiveLockingMode() bool
}

// Client is used to send request to KV layer.
type Client interface {
// Send sends request to KV layer, returns a Response.
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func RegisterMetrics() {
prometheus.MustRegister(ReadFromTableCacheCounter)
prometheus.MustRegister(LoadTableCacheDurationHistogram)
prometheus.MustRegister(NonTransactionalDMLCount)
prometheus.MustRegister(PessimisticDMLDurationByAttempt)
prometheus.MustRegister(MemoryUsage)
prometheus.MustRegister(StatsCacheLRUCounter)
prometheus.MustRegister(StatsCacheLRUGauge)
Expand Down
9 changes: 9 additions & 0 deletions metrics/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@ var (
Help: "Counter of setting tidb_constraint_check_in_place to false, note that it doesn't count the default value set by tidb config",
},
)

PessimisticDMLDurationByAttempt = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "session",
Name: "transaction_pessimistic_dml_duration_by_attempt",
Help: "Bucketed histogram of duration of pessimistic DMLs, distinguished by first attempt and retries",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 28), // 1ms ~ 1.5days
}, []string{LblType, LblPhase})
)

// Label constants.
Expand Down
6 changes: 3 additions & 3 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,10 +1268,10 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) {
}
s.txn.onStmtEnd()
if err != nil {
s.StmtRollback()
s.StmtRollback(ctx, false)
break
}
s.StmtCommit()
s.StmtCommit(ctx)
}
logutil.Logger(ctx).Warn("transaction association",
zap.Uint64("retrying txnStartTS", s.GetSessionVars().TxnCtx.StartTS),
Expand Down Expand Up @@ -2355,7 +2355,7 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.
if rs != nil {
if se.GetSessionVars().StmtCtx.IsExplainAnalyzeDML {
if !sessVars.InTxn() {
se.StmtCommit()
se.StmtCommit(ctx)
if err := se.CommitTxn(ctx); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.St
// Handle the stmt commit/rollback.
if se.txn.Valid() {
if meetsErr != nil {
se.StmtRollback()
se.StmtRollback(ctx, false)
} else {
se.StmtCommit()
se.StmtCommit(ctx)
}
}
}
Expand Down
Loading

0 comments on commit b1ecabb

Please sign in to comment.