From 9009da737834fbe8fb9eee2541133ae92905e7a5 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Fri, 13 Dec 2019 12:00:28 +0800 Subject: [PATCH] *: lock unchanged rows for pessimistic transaction (#14045) --- executor/adapter.go | 1 + executor/write.go | 6 ++++++ session/pessimistic_test.go | 30 ++++++++++++++++++++++++++++++ sessionctx/variable/session.go | 20 ++++++++++++++++++++ 4 files changed, 57 insertions(+) diff --git a/executor/adapter.go b/executor/adapter.go index 03fbf8696e9a3..ef807b4977194 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -526,6 +526,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { if err1 != nil { return err1 } + keys = txnCtx.CollectUnchangedRowKeys(keys) if len(keys) == 0 { return nil } diff --git a/executor/write.go b/executor/write.go index cc2b05409385c..27713f336902d 100644 --- a/executor/write.go +++ b/executor/write.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -123,6 +124,11 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 { sc.AddAffectedRows(1) } + unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(t.Meta().ID, h) + txnCtx := sctx.GetSessionVars().TxnCtx + if txnCtx.IsPessimistic { + txnCtx.AddUnchangedRowKey(unchangedRowKey) + } return false, false, 0, nil } diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 1f53349c4135b..7290716c1ca04 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -351,6 +351,36 @@ func (s *testPessimisticSuite) TestBankTransfer(c *C) { tk.MustQuery("select sum(c) from accounts").Check(testkit.Rows("300")) } +func (s *testPessimisticSuite) TestLockUnchangedRowKey(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists unchanged") + tk.MustExec("create table unchanged (id int primary key, c int)") + tk.MustExec("insert unchanged values (1, 1), (2, 2)") + + tk.MustExec("begin pessimistic") + tk.MustExec("update unchanged set c = 1 where id < 2") + + tk2.MustExec("begin pessimistic") + err := tk2.ExecToErr("select * from unchanged where id = 1 for update nowait") + c.Assert(err, NotNil) + + tk.MustExec("rollback") + + tk2.MustQuery("select * from unchanged where id = 1 for update nowait") + + tk.MustExec("begin pessimistic") + tk.MustExec("insert unchanged values (2, 2) on duplicate key update c = values(c)") + + err = tk2.ExecToErr("select * from unchanged where id = 2 for update nowait") + c.Assert(err, NotNil) + + tk.MustExec("commit") + + tk2.MustQuery("select * from unchanged where id = 1 for update nowait") + tk2.MustExec("rollback") +} + func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk2 := testkit.NewTestKitWithInit(c, s.store) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 06909a7115f4d..81d2e304825c4 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -105,6 +105,9 @@ type TransactionContext struct { Shard *int64 TableDeltaMap map[int64]TableDelta + // unchangedRowKeys is used to store the unchanged rows that needs to lock for pessimistic transaction. + unchangedRowKeys map[string]struct{} + // CreateTime For metrics. CreateTime time.Time StatementCount int @@ -113,6 +116,23 @@ type TransactionContext struct { IsPessimistic bool } +// AddUnchangedRowKey adds an unchanged row key in update statement for pessimistic lock. +func (tc *TransactionContext) AddUnchangedRowKey(key []byte) { + if tc.unchangedRowKeys == nil { + tc.unchangedRowKeys = map[string]struct{}{} + } + tc.unchangedRowKeys[string(key)] = struct{}{} +} + +// CollectUnchangedRowKeys collects unchanged row keys for pessimistic lock. +func (tc *TransactionContext) CollectUnchangedRowKeys(buf []kv.Key) []kv.Key { + for key := range tc.unchangedRowKeys { + buf = append(buf, kv.Key(key)) + } + tc.unchangedRowKeys = nil + return buf +} + // UpdateDeltaForTable updates the delta info for some table. func (tc *TransactionContext) UpdateDeltaForTable(tableID int64, delta int64, count int64, colSize map[int64]int64) { if tc.TableDeltaMap == nil {