Skip to content

Commit

Permalink
unistore: Update unistore for assertion (pingcap#32189)
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored Feb 9, 2022
1 parent 297455d commit a850b04
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 6 deletions.
15 changes: 15 additions & 0 deletions store/mockstore/unistore/tikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tikv

import (
"encoding/hex"
"fmt"

deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
Expand Down Expand Up @@ -132,3 +133,17 @@ type ErrTxnNotFound struct {
func (e *ErrTxnNotFound) Error() string {
return "txn not found"
}

// ErrAssertionFailed is returned if any assertion fails on a transaction request.
type ErrAssertionFailed struct {
StartTS uint64
Key []byte
Assertion kvrpcpb.Assertion
ExistingStartTS uint64
ExistingCommitTS uint64
}

func (e *ErrAssertionFailed) Error() string {
return fmt.Sprintf("AssertionFailed { StartTS: %v, Key: %v, Assertion: %v, ExistingStartTS: %v, ExistingCommitTS: %v }",
e.StartTS, hex.EncodeToString(e.Key), e.Assertion.String(), e.ExistingStartTS, e.ExistingCommitTS)
}
42 changes: 39 additions & 3 deletions store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,17 +293,23 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi
resp.Value = val
resp.CommitTs = dbMeta.CommitTS()
}
if req.ReturnValues {
if req.ReturnValues || req.CheckExistence {
for _, item := range items {
if item == nil {
resp.Values = append(resp.Values, nil)
if req.ReturnValues {
resp.Values = append(resp.Values, nil)
}
resp.NotFounds = append(resp.NotFounds, true)
continue
}
val, err1 := item.ValueCopy(nil)
if err1 != nil {
return nil, err1
}
resp.Values = append(resp.Values, val)
if req.ReturnValues {
resp.Values = append(resp.Values, val)
}
resp.NotFounds = append(resp.NotFounds, len(val) == 0)
}
}
return nil, err
Expand Down Expand Up @@ -853,6 +859,36 @@ func (store *MVCCStore) buildPrewriteLock(reqCtx *requestCtx, m *kvrpcpb.Mutatio
Value: m.Value,
Secondaries: req.Secondaries,
}
// Note that this is not fully consistent with TiKV. TiKV doesn't always get the value from Write CF. In
// AssertionLevel_Fast, TiKV skips checking assertion if Write CF is not read, in order not to harm the performance.
// However, unistore can always check it. It's better not to assume the store's behavior about assertion when the
// mode is set to AssertionLevel_Fast.
if req.AssertionLevel != kvrpcpb.AssertionLevel_Off {
if item == nil || item.IsEmpty() {
if m.Assertion == kvrpcpb.Assertion_Exist {
log.Error("ASSERTION FAIL!!! non-exist for must exist key", zap.Stringer("mutation", m))
return nil, &ErrAssertionFailed{
StartTS: req.StartVersion,
Key: m.Key,
Assertion: m.Assertion,
ExistingStartTS: 0,
ExistingCommitTS: 0,
}
}
} else {
if m.Assertion == kvrpcpb.Assertion_NotExist {
log.Error("ASSERTION FAIL!!! exist for must non-exist key", zap.Stringer("mutation", m))
userMeta := mvcc.DBUserMeta(item.UserMeta())
return nil, &ErrAssertionFailed{
StartTS: req.StartVersion,
Key: m.Key,
Assertion: m.Assertion,
ExistingStartTS: userMeta.StartTS(),
ExistingCommitTS: userMeta.CommitTS(),
}
}
}
}
var err error
lock.Op = uint8(m.Op)
if lock.Op == uint8(kvrpcpb.Op_Insert) {
Expand Down
128 changes: 125 additions & 3 deletions store/mockstore/unistore/tikv/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/pingcap/badger"
"github.com/pingcap/badger/y"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/store/mockstore/unistore/config"
Expand Down Expand Up @@ -154,32 +155,54 @@ func PessimisticLock(pk []byte, key []byte, startTs uint64, lockTTL uint64, forU
// PrewriteOptimistic raises optimistic prewrite requests on store
func PrewriteOptimistic(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64,
minCommitTs uint64, useAsyncCommit bool, secondaries [][]byte, store *TestStore) error {
return PrewriteOptimisticWithAssertion(pk, key, value, startTs, lockTTL, minCommitTs, useAsyncCommit, secondaries,
kvrpcpb.Assertion_None, kvrpcpb.AssertionLevel_Off, store)
}

// PrewriteOptimisticWithAssertion raises optimistic prewrite requests on store, with specified assertion config
func PrewriteOptimisticWithAssertion(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64,
minCommitTs uint64, useAsyncCommit bool, secondaries [][]byte, assertion kvrpcpb.Assertion,
assertionLevel kvrpcpb.AssertionLevel, store *TestStore) error {
op := kvrpcpb.Op_Put
if value == nil {
op = kvrpcpb.Op_Del
}
mutation := newMutation(op, key, value)
mutation.Assertion = assertion
prewriteReq := &kvrpcpb.PrewriteRequest{
Mutations: []*kvrpcpb.Mutation{newMutation(op, key, value)},
Mutations: []*kvrpcpb.Mutation{mutation},
PrimaryLock: pk,
StartVersion: startTs,
LockTtl: lockTTL,
MinCommitTs: minCommitTs,
UseAsyncCommit: useAsyncCommit,
Secondaries: secondaries,
AssertionLevel: assertionLevel,
}
return store.MvccStore.prewriteOptimistic(store.newReqCtx(), prewriteReq.Mutations, prewriteReq)
}

// PrewritePessimistic raises pessmistic prewrite requests
// PrewritePessimistic raises pessimistic prewrite requests
func PrewritePessimistic(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64,
isPessimisticLock []bool, forUpdateTs uint64, store *TestStore) error {
return PrewritePessimisticWithAssertion(pk, key, value, startTs, lockTTL, isPessimisticLock, forUpdateTs,
kvrpcpb.Assertion_None, kvrpcpb.AssertionLevel_Off, store)
}

// PrewritePessimisticWithAssertion raises pessimistic prewrite requests, with specified assertion config
func PrewritePessimisticWithAssertion(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64,
isPessimisticLock []bool, forUpdateTs uint64, assertion kvrpcpb.Assertion, assertionLevel kvrpcpb.AssertionLevel,
store *TestStore) error {
mutation := newMutation(kvrpcpb.Op_Put, key, value)
mutation.Assertion = assertion
prewriteReq := &kvrpcpb.PrewriteRequest{
Mutations: []*kvrpcpb.Mutation{newMutation(kvrpcpb.Op_Put, key, value)},
Mutations: []*kvrpcpb.Mutation{mutation},
PrimaryLock: pk,
StartVersion: startTs,
LockTtl: lockTTL,
IsPessimisticLock: isPessimisticLock,
ForUpdateTs: forUpdateTs,
AssertionLevel: assertionLevel,
}
return store.MvccStore.prewritePessimistic(store.newReqCtx(), prewriteReq.Mutations, prewriteReq)
}
Expand Down Expand Up @@ -1678,3 +1701,102 @@ func TestTiKVRCRead(t *testing.T) {
require.Equal(t, pair.Value, v)
}
}

func TestAssertion(t *testing.T) {
store, close := NewTestStore("TestAssertion", "TestAssertion", t)
defer close()

// Prepare
MustPrewriteOptimistic([]byte("k1"), []byte("k1"), []byte("v1"), 1, 100, 0, store)
MustPrewriteOptimistic([]byte("k1"), []byte("k2"), []byte("v2"), 1, 100, 0, store)
MustPrewriteOptimistic([]byte("k1"), []byte("k3"), []byte("v3"), 1, 100, 0, store)
MustCommit([]byte("k1"), 1, 2, store)
MustCommit([]byte("k2"), 1, 2, store)
MustCommit([]byte("k3"), 1, 2, store)

checkAssertionFailedError := func(err error, disable bool, startTs uint64, key []byte, assertion kvrpcpb.Assertion, existingStartTs uint64, existingCommitTs uint64) {
t.Logf("Check error: %+q", err)
if disable {
require.Nil(t, err)
return
}
require.NotNil(t, err)
e, ok := errors.Cause(err).(*ErrAssertionFailed)
require.True(t, ok)
require.Equal(t, startTs, e.StartTS)
require.Equal(t, key, e.Key)
require.Equal(t, assertion, e.Assertion)
require.Equal(t, existingStartTs, e.ExistingStartTS)
require.Equal(t, existingCommitTs, e.ExistingCommitTS)
}

for _, disable := range []bool{false, true} {
level := kvrpcpb.AssertionLevel_Strict
if disable {
level = kvrpcpb.AssertionLevel_Off
}
// Test with optimistic transaction
err := PrewriteOptimisticWithAssertion([]byte("k1"), []byte("k1"), []byte("v1"), 10, 100, 0, false, nil,
kvrpcpb.Assertion_NotExist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k1"), kvrpcpb.Assertion_NotExist, 1, 2)
err = PrewriteOptimisticWithAssertion([]byte("k11"), []byte("k11"), []byte("v11"), 10, 100, 0, false, nil,
kvrpcpb.Assertion_Exist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k11"), kvrpcpb.Assertion_Exist, 0, 0)

// Test with pessimistic transaction
MustAcquirePessimisticLock([]byte("k2"), []byte("k2"), 10, 10, store)
err = PrewritePessimisticWithAssertion([]byte("k2"), []byte("k2"), []byte("v2"), 10, 100, []bool{true}, 10,
kvrpcpb.Assertion_NotExist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k2"), kvrpcpb.Assertion_NotExist, 1, 2)
MustAcquirePessimisticLock([]byte("k22"), []byte("k22"), 10, 10, store)
err = PrewritePessimisticWithAssertion([]byte("k22"), []byte("k22"), []byte("v22"), 10, 100, []bool{true}, 10,
kvrpcpb.Assertion_Exist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k22"), kvrpcpb.Assertion_Exist, 0, 0)

// Test with pessimistic transaction (non-pessimistic-lock)
err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k3"), []byte("v3"), 10, 100, []bool{false}, 10,
kvrpcpb.Assertion_NotExist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k3"), kvrpcpb.Assertion_NotExist, 1, 2)
err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k33"), []byte("v33"), 10, 100, []bool{false}, 10,
kvrpcpb.Assertion_Exist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k33"), kvrpcpb.Assertion_Exist, 0, 0)
}

for _, k := range [][]byte{
[]byte("k1"),
[]byte("k11"),
[]byte("k2"),
[]byte("k22"),
[]byte("k3"),
[]byte("k33"),
} {
MustRollbackKey(k, 10, store)
}

// Test assertion passes
// Test with optimistic transaction
err := PrewriteOptimisticWithAssertion([]byte("k1"), []byte("k1"), []byte("v1"), 20, 100, 0, false, nil,
kvrpcpb.Assertion_Exist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)
err = PrewriteOptimisticWithAssertion([]byte("k11"), []byte("k11"), []byte("v11"), 20, 100, 0, false, nil,
kvrpcpb.Assertion_NotExist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)

// Test with pessimistic transaction
MustAcquirePessimisticLock([]byte("k2"), []byte("k2"), 20, 10, store)
err = PrewritePessimisticWithAssertion([]byte("k2"), []byte("k2"), []byte("v2"), 20, 100, []bool{true}, 10,
kvrpcpb.Assertion_Exist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)
MustAcquirePessimisticLock([]byte("k22"), []byte("k22"), 20, 10, store)
err = PrewritePessimisticWithAssertion([]byte("k22"), []byte("k22"), []byte("v22"), 20, 100, []bool{true}, 10,
kvrpcpb.Assertion_NotExist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)

// Test with pessimistic transaction (non-pessimistic-lock)
err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k3"), []byte("v3"), 20, 100, []bool{false}, 10,
kvrpcpb.Assertion_Exist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)
err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k33"), []byte("v33"), 20, 100, []bool{false}, 10,
kvrpcpb.Assertion_NotExist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)
}
10 changes: 10 additions & 0 deletions store/mockstore/unistore/tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,16 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
PrimaryKey: x.PrimaryKey,
},
}
case *ErrAssertionFailed:
return &kvrpcpb.KeyError{
AssertionFailed: &kvrpcpb.AssertionFailed{
StartTs: x.StartTS,
Key: x.Key,
Assertion: x.Assertion,
ExistingStartTs: x.ExistingStartTS,
ExistingCommitTs: x.ExistingCommitTS,
},
}
default:
return &kvrpcpb.KeyError{
Abort: err.Error(),
Expand Down

0 comments on commit a850b04

Please sign in to comment.