Skip to content

Commit

Permalink
*: lock unchanged rows for pessimistic transaction (pingcap#14045)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood committed Dec 13, 2019
1 parent be7cb4d commit 79807ba
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 0 deletions.
1 change: 1 addition & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
if err1 != nil {
return err1
}
keys = txnCtx.CollectUnchangedRowKeys(keys)
if len(keys) == 0 {
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -116,6 +117,11 @@ func updateRecord(ctx sessionctx.Context, h int64, oldData, newData []types.Datu
if ctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
sc.AddAffectedRows(1)
}
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(t.Meta().ID, h)
txnCtx := ctx.GetSessionVars().TxnCtx
if txnCtx.IsPessimistic {
txnCtx.AddUnchangedRowKey(unchangedRowKey)
}
return false, false, 0, nil
}

Expand Down
30 changes: 30 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,36 @@ func (s *testPessimisticSuite) TestBankTransfer(c *C) {
tk.MustQuery("select sum(c) from accounts").Check(testkit.Rows("300"))
}

func (s *testPessimisticSuite) TestLockUnchangedRowKey(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists unchanged")
tk.MustExec("create table unchanged (id int primary key, c int)")
tk.MustExec("insert unchanged values (1, 1), (2, 2)")

tk.MustExec("begin pessimistic")
tk.MustExec("update unchanged set c = 1 where id < 2")

tk2.MustExec("begin pessimistic")
err := tk2.ExecToErr("select * from unchanged where id = 1 for update nowait")
c.Assert(err, NotNil)

tk.MustExec("rollback")

tk2.MustQuery("select * from unchanged where id = 1 for update nowait")

tk.MustExec("begin pessimistic")
tk.MustExec("insert unchanged values (2, 2) on duplicate key update c = values(c)")

err = tk2.ExecToErr("select * from unchanged where id = 2 for update nowait")
c.Assert(err, NotNil)

tk.MustExec("commit")

tk2.MustQuery("select * from unchanged where id = 1 for update nowait")
tk2.MustExec("rollback")
}

func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
Expand Down
20 changes: 20 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,33 @@ type TransactionContext struct {
TableDeltaMap map[int64]TableDelta
IsPessimistic bool

// unchangedRowKeys is used to store the unchanged rows that needs to lock for pessimistic transaction.
unchangedRowKeys map[string]struct{}

// CreateTime For metrics.
CreateTime time.Time
StatementCount int

IsBatched bool
}

// AddUnchangedRowKey adds an unchanged row key in update statement for pessimistic lock.
func (tc *TransactionContext) AddUnchangedRowKey(key []byte) {
if tc.unchangedRowKeys == nil {
tc.unchangedRowKeys = map[string]struct{}{}
}
tc.unchangedRowKeys[string(key)] = struct{}{}
}

// CollectUnchangedRowKeys collects unchanged row keys for pessimistic lock.
func (tc *TransactionContext) CollectUnchangedRowKeys(buf []kv.Key) []kv.Key {
for key := range tc.unchangedRowKeys {
buf = append(buf, kv.Key(key))
}
tc.unchangedRowKeys = nil
return buf
}

// UpdateDeltaForTable updates the delta info for some table.
func (tc *TransactionContext) UpdateDeltaForTable(tableID int64, delta int64, count int64, colSize map[int64]int64) {
if tc.TableDeltaMap == nil {
Expand Down

0 comments on commit 79807ba

Please sign in to comment.