Skip to content

Commit

Permalink
*: lock unchanged rows for pessimistic transaction (#14045)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored and sre-bot committed Dec 13, 2019
1 parent 5cac105 commit 9009da7
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 0 deletions.
1 change: 1 addition & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
if err1 != nil {
return err1
}
keys = txnCtx.CollectUnchangedRowKeys(keys)
if len(keys) == 0 {
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -123,6 +124,11 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData
if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
sc.AddAffectedRows(1)
}
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(t.Meta().ID, h)
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.IsPessimistic {
txnCtx.AddUnchangedRowKey(unchangedRowKey)
}
return false, false, 0, nil
}

Expand Down
30 changes: 30 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,36 @@ func (s *testPessimisticSuite) TestBankTransfer(c *C) {
tk.MustQuery("select sum(c) from accounts").Check(testkit.Rows("300"))
}

func (s *testPessimisticSuite) TestLockUnchangedRowKey(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists unchanged")
tk.MustExec("create table unchanged (id int primary key, c int)")
tk.MustExec("insert unchanged values (1, 1), (2, 2)")

tk.MustExec("begin pessimistic")
tk.MustExec("update unchanged set c = 1 where id < 2")

tk2.MustExec("begin pessimistic")
err := tk2.ExecToErr("select * from unchanged where id = 1 for update nowait")
c.Assert(err, NotNil)

tk.MustExec("rollback")

tk2.MustQuery("select * from unchanged where id = 1 for update nowait")

tk.MustExec("begin pessimistic")
tk.MustExec("insert unchanged values (2, 2) on duplicate key update c = values(c)")

err = tk2.ExecToErr("select * from unchanged where id = 2 for update nowait")
c.Assert(err, NotNil)

tk.MustExec("commit")

tk2.MustQuery("select * from unchanged where id = 1 for update nowait")
tk2.MustExec("rollback")
}

func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
Expand Down
20 changes: 20 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type TransactionContext struct {
Shard *int64
TableDeltaMap map[int64]TableDelta

// unchangedRowKeys is used to store the unchanged rows that needs to lock for pessimistic transaction.
unchangedRowKeys map[string]struct{}

// CreateTime For metrics.
CreateTime time.Time
StatementCount int
Expand All @@ -113,6 +116,23 @@ type TransactionContext struct {
IsPessimistic bool
}

// AddUnchangedRowKey adds an unchanged row key in update statement for pessimistic lock.
func (tc *TransactionContext) AddUnchangedRowKey(key []byte) {
if tc.unchangedRowKeys == nil {
tc.unchangedRowKeys = map[string]struct{}{}
}
tc.unchangedRowKeys[string(key)] = struct{}{}
}

// CollectUnchangedRowKeys collects unchanged row keys for pessimistic lock.
func (tc *TransactionContext) CollectUnchangedRowKeys(buf []kv.Key) []kv.Key {
for key := range tc.unchangedRowKeys {
buf = append(buf, kv.Key(key))
}
tc.unchangedRowKeys = nil
return buf
}

// UpdateDeltaForTable updates the delta info for some table.
func (tc *TransactionContext) UpdateDeltaForTable(tableID int64, delta int64, count int64, colSize map[int64]int64) {
if tc.TableDeltaMap == nil {
Expand Down

0 comments on commit 9009da7

Please sign in to comment.