diff --git a/executor/executor.go b/executor/executor.go index 6a5ad18e1488b..669a67ac0f2ab 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -955,6 +955,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx { LockKeysDuration: &seVars.StmtCtx.LockKeysDuration, LockKeysCount: &seVars.StmtCtx.LockKeysCount, LockExpired: &seVars.TxnCtx.LockExpire, + CheckKeyExists: seVars.StmtCtx.CheckKeyExists, } } @@ -1652,6 +1653,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { } sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool) + sc.CheckKeyExists = make(map[string]struct{}) errCount, warnCount := vars.StmtCtx.NumErrorWarnings() vars.SysErrorCount = errCount vars.SysWarningCount = warnCount diff --git a/kv/kv.go b/kv/kv.go index 361c635ec9533..c066e20ac1e2d 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -256,6 +256,7 @@ type LockCtx struct { Values map[string]ReturnedValue ValuesLock sync.Mutex LockExpired *uint32 + CheckKeyExists map[string]struct{} } // ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result. diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 6d11da84c1a28..d78defdb93951 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -1498,3 +1498,185 @@ func (s *testPessimisticSuite) TestPessimisticUnionForUpdate(c *C) { tk.MustExec("commit") tk.MustExec("admin check table t") } + +func (s *testPessimisticSuite) TestInsertDupKeyAfterLock(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop database if exists test_db") + tk.MustExec("create database test_db") + tk.MustExec("use test_db") + tk2.MustExec("use test_db") + tk2.MustExec("drop table if exists t1") + tk2.MustExec("create table t1(c1 int primary key, c2 int, c3 int, unique key uk(c2));") + tk2.MustExec("insert into t1 values(1, 2, 3);") + tk2.MustExec("insert into t1 values(10, 20, 30);") + + // Test insert after lock. + tk.MustExec("begin pessimistic") + err := tk.ExecToErr("update t1 set c2 = 20 where c1 = 1;") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + err = tk.ExecToErr("insert into t1 values(1, 15, 300);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30")) + + tk.MustExec("begin pessimistic") + tk.MustExec("select * from t1 for update") + err = tk.ExecToErr("insert into t1 values(1, 15, 300);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30")) + + tk.MustExec("begin pessimistic") + tk.MustExec("select * from t1 where c2 = 2 for update") + err = tk.ExecToErr("insert into t1 values(1, 15, 300);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30")) + + // Test insert after insert. + tk.MustExec("begin pessimistic") + err = tk.ExecToErr("insert into t1 values(1, 15, 300);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("insert into t1 values(5, 6, 7)") + err = tk.ExecToErr("insert into t1 values(6, 6, 7);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 30")) + + // Test insert after delete. + tk.MustExec("begin pessimistic") + tk.MustExec("delete from t1 where c2 > 2") + tk.MustExec("insert into t1 values(10, 20, 500);") + err = tk.ExecToErr("insert into t1 values(20, 20, 30);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + err = tk.ExecToErr("insert into t1 values(1, 20, 30);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500")) + + // Test range. + tk.MustExec("begin pessimistic") + err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 5;") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 50;") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + err = tk.ExecToErr("insert into t1 values(1, 15, 300);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500")) + + // Test select for update after dml. + tk.MustExec("begin pessimistic") + tk.MustExec("insert into t1 values(5, 6, 7)") + tk.MustExec("select * from t1 where c1 = 5 for update") + tk.MustExec("select * from t1 where c1 = 6 for update") + tk.MustExec("select * from t1 for update") + err = tk.ExecToErr("insert into t1 values(7, 6, 7)") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + err = tk.ExecToErr("insert into t1 values(5, 8, 6)") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("select * from t1 where c1 = 5 for update") + tk.MustExec("select * from t1 where c2 = 8 for update") + tk.MustExec("select * from t1 for update") + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 500")) + + // Test optimistic for update. + tk.MustExec("begin optimistic") + tk.MustQuery("select * from t1 where c1 = 1 for update").Check(testkit.Rows("1 2 3")) + tk.MustExec("insert into t1 values(10, 10, 10)") + err = tk.ExecToErr("commit") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) +} + +func (s *testPessimisticSuite) TestInsertDupKeyAfterLockBatchPointGet(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop database if exists test_db") + tk.MustExec("create database test_db") + tk.MustExec("use test_db") + tk2.MustExec("use test_db") + tk2.MustExec("drop table if exists t1") + tk2.MustExec("create table t1(c1 int primary key, c2 int, c3 int, unique key uk(c2));") + tk2.MustExec("insert into t1 values(1, 2, 3);") + tk2.MustExec("insert into t1 values(10, 20, 30);") + + // Test insert after lock. + tk.MustExec("begin pessimistic") + err := tk.ExecToErr("update t1 set c2 = 20 where c1 in (1);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + err = tk.ExecToErr("insert into t1 values(1, 15, 300);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30")) + + tk.MustExec("begin pessimistic") + tk.MustExec("select * from t1 for update") + err = tk.ExecToErr("insert into t1 values(1, 15, 300);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30")) + + tk.MustExec("begin pessimistic") + tk.MustExec("select * from t1 where c2 in (2) for update") + err = tk.ExecToErr("insert into t1 values(1, 15, 300);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30")) + + // Test insert after insert. + tk.MustExec("begin pessimistic") + err = tk.ExecToErr("insert into t1 values(1, 15, 300);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("insert into t1 values(5, 6, 7)") + err = tk.ExecToErr("insert into t1 values(6, 6, 7);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 30")) + + // Test insert after delete. + tk.MustExec("begin pessimistic") + tk.MustExec("delete from t1 where c2 > 2") + tk.MustExec("insert into t1 values(10, 20, 500);") + err = tk.ExecToErr("insert into t1 values(20, 20, 30);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + err = tk.ExecToErr("insert into t1 values(1, 20, 30);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500")) + + // Test range. + tk.MustExec("begin pessimistic") + err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 5;") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 50;") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + err = tk.ExecToErr("insert into t1 values(1, 15, 300);") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500")) + + // Test select for update after dml. + tk.MustExec("begin pessimistic") + tk.MustExec("insert into t1 values(5, 6, 7)") + tk.MustExec("select * from t1 where c1 in (5, 6) for update") + tk.MustExec("select * from t1 where c1 = 6 for update") + tk.MustExec("select * from t1 for update") + err = tk.ExecToErr("insert into t1 values(7, 6, 7)") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + err = tk.ExecToErr("insert into t1 values(5, 8, 6)") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) + tk.MustExec("select * from t1 where c2 = 8 for update") + tk.MustExec("select * from t1 where c1 in (5, 8) for update") + tk.MustExec("select * from t1 for update") + tk.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 500")) + + // Test optimistic for update. + tk.MustExec("begin optimistic") + tk.MustQuery("select * from t1 where c1 in (1) for update").Check(testkit.Rows("1 2 3")) + tk.MustExec("insert into t1 values(10, 10, 10)") + err = tk.ExecToErr("commit") + c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) +} diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 6498180875cb8..1162f592c0848 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -152,8 +152,9 @@ type StatementContext struct { LockKeysDuration int64 LockKeysCount int32 TblInfo2UnionScan map[*model.TableInfo]bool - TaskID uint64 // unique ID for an execution of a statement - TaskMapBakTS uint64 // counter for + TaskID uint64 // unique ID for an execution of a statement + TaskMapBakTS uint64 // counter for + CheckKeyExists map[string]struct{} // mark the keys needs to check for existence for pessimistic locks. } // StmtHints are SessionVars related sql hints. @@ -485,6 +486,7 @@ func (sc *StatementContext) ResetForRetry() { sc.TableIDs = sc.TableIDs[:0] sc.IndexNames = sc.IndexNames[:0] sc.TaskID = AllocateTaskID() + sc.CheckKeyExists = make(map[string]struct{}) } // MergeExecDetails merges a single region execution details into self, used to print diff --git a/store/tikv/txn.go b/store/tikv/txn.go index c5b181953ea85..45ecd38b2b060 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -62,7 +62,7 @@ type tikvTxn struct { startTime time.Time // Monotonic timestamp for recording txn time consuming. commitTS uint64 lockKeys [][]byte - lockedMap map[string]struct{} + lockedMap map[string]bool mu sync.Mutex // For thread-safe LockKeys function. setCnt int64 vars *kv.Variables @@ -100,7 +100,7 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64, replicaReadSeed uin return &tikvTxn{ snapshot: snapshot, us: kv.NewUnionStore(snapshot), - lockedMap: map[string]struct{}{}, + lockedMap: make(map[string]bool), store: store, startTS: startTS, startTime: time.Now(), @@ -350,9 +350,17 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput } }() for _, key := range keysInput { - if _, ok := txn.lockedMap[string(key)]; !ok { + // The value of lockedMap is only used by pessimistic transactions. + valueExist, locked := txn.lockedMap[string(key)] + _, checkKeyExists := lockCtx.CheckKeyExists[string(key)] + if !locked { keys = append(keys, key) - } else if lockCtx.ReturnValues { + } else if txn.IsPessimistic() { + if checkKeyExists && valueExist { + return txn.committer.extractKeyExistsErr(key) + } + } + if lockCtx.ReturnValues && locked { // An already locked key can not return values, we add an entry to let the caller get the value // in other ways. lockCtx.Values[string(key)] = kv.ReturnedValue{AlreadyLocked: true} @@ -425,7 +433,15 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput } txn.lockKeys = append(txn.lockKeys, keys...) for _, key := range keys { - txn.lockedMap[string(key)] = struct{}{} + // PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exist. + // For other lock modes, the locked key values always exist. + if lockCtx.ReturnValues { + val, _ := lockCtx.Values[string(key)] + valExists := len(val.Value) > 0 + txn.lockedMap[string(key)] = valExists + } else { + txn.lockedMap[string(key)] = true + } } txn.dirty = true return nil diff --git a/table/tables/index.go b/table/tables/index.go index 01c919f1754a7..6ba83f65c4a1c 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -284,6 +284,7 @@ func (c *index) Create(sctx sessionctx.Context, us kv.UnionStore, indexedValues if err != nil || len(value) == 0 { if sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil { err = us.GetMemBuffer().SetWithFlags(key, idxVal, kv.SetPresumeKeyNotExists) + sctx.GetSessionVars().StmtCtx.CheckKeyExists[string(key)] = struct{}{} } else { err = us.GetMemBuffer().Set(key, idxVal) } diff --git a/table/tables/tables.go b/table/tables/tables.go index c705b5f83cc50..1b6632fb209b6 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -699,6 +699,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . if setPresume { err = memBuffer.SetWithFlags(key, value, kv.SetPresumeKeyNotExists) + sctx.GetSessionVars().StmtCtx.CheckKeyExists[string(key)] = struct{}{} } else { err = memBuffer.Set(key, value) }