Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: fix pessimitic exist check #19004

Merged
merged 2 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx {
LockKeysDuration: &seVars.StmtCtx.LockKeysDuration,
LockKeysCount: &seVars.StmtCtx.LockKeysCount,
LockExpired: &seVars.TxnCtx.LockExpire,
CheckKeyExists: seVars.StmtCtx.CheckKeyExists,
}
}

Expand Down Expand Up @@ -1652,6 +1653,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
}

sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool)
sc.CheckKeyExists = make(map[string]struct{})
errCount, warnCount := vars.StmtCtx.NumErrorWarnings()
vars.SysErrorCount = errCount
vars.SysWarningCount = warnCount
Expand Down
1 change: 1 addition & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ type LockCtx struct {
Values map[string]ReturnedValue
ValuesLock sync.Mutex
LockExpired *uint32
CheckKeyExists map[string]struct{}
}

// ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result.
Expand Down
182 changes: 182 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,3 +1498,185 @@ func (s *testPessimisticSuite) TestPessimisticUnionForUpdate(c *C) {
tk.MustExec("commit")
tk.MustExec("admin check table t")
}

func (s *testPessimisticSuite) TestInsertDupKeyAfterLock(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop database if exists test_db")
tk.MustExec("create database test_db")
tk.MustExec("use test_db")
tk2.MustExec("use test_db")
tk2.MustExec("drop table if exists t1")
tk2.MustExec("create table t1(c1 int primary key, c2 int, c3 int, unique key uk(c2));")
tk2.MustExec("insert into t1 values(1, 2, 3);")
tk2.MustExec("insert into t1 values(10, 20, 30);")

// Test insert after lock.
tk.MustExec("begin pessimistic")
err := tk.ExecToErr("update t1 set c2 = 20 where c1 = 1;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

tk.MustExec("begin pessimistic")
tk.MustExec("select * from t1 for update")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

tk.MustExec("begin pessimistic")
tk.MustExec("select * from t1 where c2 = 2 for update")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

// Test insert after insert.
tk.MustExec("begin pessimistic")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("insert into t1 values(5, 6, 7)")
err = tk.ExecToErr("insert into t1 values(6, 6, 7);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 30"))

// Test insert after delete.
tk.MustExec("begin pessimistic")
tk.MustExec("delete from t1 where c2 > 2")
tk.MustExec("insert into t1 values(10, 20, 500);")
err = tk.ExecToErr("insert into t1 values(20, 20, 30);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 20, 30);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500"))

// Test range.
tk.MustExec("begin pessimistic")
err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 5;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 50;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500"))

// Test select for update after dml.
tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values(5, 6, 7)")
tk.MustExec("select * from t1 where c1 = 5 for update")
tk.MustExec("select * from t1 where c1 = 6 for update")
tk.MustExec("select * from t1 for update")
err = tk.ExecToErr("insert into t1 values(7, 6, 7)")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(5, 8, 6)")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("select * from t1 where c1 = 5 for update")
tk.MustExec("select * from t1 where c2 = 8 for update")
tk.MustExec("select * from t1 for update")
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 500"))

// Test optimistic for update.
tk.MustExec("begin optimistic")
tk.MustQuery("select * from t1 where c1 = 1 for update").Check(testkit.Rows("1 2 3"))
tk.MustExec("insert into t1 values(10, 10, 10)")
err = tk.ExecToErr("commit")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
}

func (s *testPessimisticSuite) TestInsertDupKeyAfterLockBatchPointGet(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop database if exists test_db")
tk.MustExec("create database test_db")
tk.MustExec("use test_db")
tk2.MustExec("use test_db")
tk2.MustExec("drop table if exists t1")
tk2.MustExec("create table t1(c1 int primary key, c2 int, c3 int, unique key uk(c2));")
tk2.MustExec("insert into t1 values(1, 2, 3);")
tk2.MustExec("insert into t1 values(10, 20, 30);")

// Test insert after lock.
tk.MustExec("begin pessimistic")
err := tk.ExecToErr("update t1 set c2 = 20 where c1 in (1);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

tk.MustExec("begin pessimistic")
tk.MustExec("select * from t1 for update")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

tk.MustExec("begin pessimistic")
tk.MustExec("select * from t1 where c2 in (2) for update")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

// Test insert after insert.
tk.MustExec("begin pessimistic")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("insert into t1 values(5, 6, 7)")
err = tk.ExecToErr("insert into t1 values(6, 6, 7);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 30"))

// Test insert after delete.
tk.MustExec("begin pessimistic")
tk.MustExec("delete from t1 where c2 > 2")
tk.MustExec("insert into t1 values(10, 20, 500);")
err = tk.ExecToErr("insert into t1 values(20, 20, 30);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 20, 30);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500"))

// Test range.
tk.MustExec("begin pessimistic")
err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 5;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 50;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500"))

// Test select for update after dml.
tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values(5, 6, 7)")
tk.MustExec("select * from t1 where c1 in (5, 6) for update")
tk.MustExec("select * from t1 where c1 = 6 for update")
tk.MustExec("select * from t1 for update")
err = tk.ExecToErr("insert into t1 values(7, 6, 7)")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(5, 8, 6)")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("select * from t1 where c2 = 8 for update")
tk.MustExec("select * from t1 where c1 in (5, 8) for update")
tk.MustExec("select * from t1 for update")
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 500"))

// Test optimistic for update.
tk.MustExec("begin optimistic")
tk.MustQuery("select * from t1 where c1 in (1) for update").Check(testkit.Rows("1 2 3"))
tk.MustExec("insert into t1 values(10, 10, 10)")
err = tk.ExecToErr("commit")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
}
6 changes: 4 additions & 2 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ type StatementContext struct {
LockKeysDuration int64
LockKeysCount int32
TblInfo2UnionScan map[*model.TableInfo]bool
TaskID uint64 // unique ID for an execution of a statement
TaskMapBakTS uint64 // counter for
TaskID uint64 // unique ID for an execution of a statement
TaskMapBakTS uint64 // counter for
CheckKeyExists map[string]struct{} // mark the keys needs to check for existence for pessimistic locks.
}

// StmtHints are SessionVars related sql hints.
Expand Down Expand Up @@ -485,6 +486,7 @@ func (sc *StatementContext) ResetForRetry() {
sc.TableIDs = sc.TableIDs[:0]
sc.IndexNames = sc.IndexNames[:0]
sc.TaskID = AllocateTaskID()
sc.CheckKeyExists = make(map[string]struct{})
}

// MergeExecDetails merges a single region execution details into self, used to print
Expand Down
26 changes: 21 additions & 5 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type tikvTxn struct {
startTime time.Time // Monotonic timestamp for recording txn time consuming.
commitTS uint64
lockKeys [][]byte
lockedMap map[string]struct{}
lockedMap map[string]bool
mu sync.Mutex // For thread-safe LockKeys function.
setCnt int64
vars *kv.Variables
Expand Down Expand Up @@ -100,7 +100,7 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64, replicaReadSeed uin
return &tikvTxn{
snapshot: snapshot,
us: kv.NewUnionStore(snapshot),
lockedMap: map[string]struct{}{},
lockedMap: make(map[string]bool),
store: store,
startTS: startTS,
startTime: time.Now(),
Expand Down Expand Up @@ -350,9 +350,17 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
}
}()
for _, key := range keysInput {
if _, ok := txn.lockedMap[string(key)]; !ok {
// The value of lockedMap is only used by pessimistic transactions.
valueExist, locked := txn.lockedMap[string(key)]
_, checkKeyExists := lockCtx.CheckKeyExists[string(key)]
if !locked {
keys = append(keys, key)
} else if lockCtx.ReturnValues {
} else if txn.IsPessimistic() {
if checkKeyExists && valueExist {
return txn.committer.extractKeyExistsErr(key)
}
}
if lockCtx.ReturnValues && locked {
// An already locked key can not return values, we add an entry to let the caller get the value
// in other ways.
lockCtx.Values[string(key)] = kv.ReturnedValue{AlreadyLocked: true}
Expand Down Expand Up @@ -425,7 +433,15 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
}
txn.lockKeys = append(txn.lockKeys, keys...)
for _, key := range keys {
txn.lockedMap[string(key)] = struct{}{}
// PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exist.
// For other lock modes, the locked key values always exist.
if lockCtx.ReturnValues {
val, _ := lockCtx.Values[string(key)]
valExists := len(val.Value) > 0
txn.lockedMap[string(key)] = valExists
} else {
txn.lockedMap[string(key)] = true
}
}
txn.dirty = true
return nil
Expand Down
1 change: 1 addition & 0 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func (c *index) Create(sctx sessionctx.Context, us kv.UnionStore, indexedValues
if err != nil || len(value) == 0 {
if sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil {
err = us.GetMemBuffer().SetWithFlags(key, idxVal, kv.SetPresumeKeyNotExists)
sctx.GetSessionVars().StmtCtx.CheckKeyExists[string(key)] = struct{}{}
} else {
err = us.GetMemBuffer().Set(key, idxVal)
}
Expand Down
1 change: 1 addition & 0 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .

if setPresume {
err = memBuffer.SetWithFlags(key, value, kv.SetPresumeKeyNotExists)
sctx.GetSessionVars().StmtCtx.CheckKeyExists[string(key)] = struct{}{}
} else {
err = memBuffer.Set(key, value)
}
Expand Down