diff --git a/executor/executor_failpoint_test.go b/executor/executor_failpoint_test.go index 14855c3116c32..70630ad184a4a 100644 --- a/executor/executor_failpoint_test.go +++ b/executor/executor_failpoint_test.go @@ -27,9 +27,11 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/copr" + "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/dbterror/exeerrors" "github.com/pingcap/tidb/util/deadlockhistory" @@ -555,3 +557,31 @@ func TestDeadlocksTable(t *testing.T) { id2+"/2022-06-11 02:03:04.987654/1/203/////201", )) } + +func TestGetMvccByEncodedKeyRegionError(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + h := helper.NewHelper(store.(helper.Storage)) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + schemaVersion := tk.Session().GetDomainInfoSchema().SchemaMetaVersion() + key := m.EncodeSchemaDiffKey(schemaVersion) + + resp, err := h.GetMvccByEncodedKey(key) + require.NoError(t, err) + require.NotNil(t, resp.Info) + require.Equal(t, 1, len(resp.Info.Writes)) + require.Less(t, uint64(0), resp.Info.Writes[0].CommitTs) + commitTs := resp.Info.Writes[0].CommitTs + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/epochNotMatch", "2*return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/epochNotMatch")) + }() + resp, err = h.GetMvccByEncodedKey(key) + require.NoError(t, err) + require.NotNil(t, resp.Info) + require.Equal(t, 1, len(resp.Info.Writes)) + require.Equal(t, commitTs, resp.Info.Writes[0].CommitTs) +} diff --git a/store/helper/helper.go b/store/helper/helper.go index bce970305bec8..9b55ae787e962 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -97,23 +97,45 @@ func NewHelper(store Storage) *Helper { // GetMvccByEncodedKey get the MVCC value by the specific encoded key. func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { - keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackofferWithVars(context.Background(), 500, nil), encodedKey) - if err != nil { - return nil, derr.ToTiDBErr(err) - } - + bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) - kvResp, err := h.Store.SendReq(tikv.NewBackofferWithVars(context.Background(), 500, nil), tikvReq, keyLocation.Region, time.Minute) - if err != nil { - logutil.BgLogger().Info("get MVCC by encoded key failed", - zap.Stringer("encodeKey", encodedKey), - zap.Reflect("region", keyLocation.Region), - zap.Stringer("keyLocation", keyLocation), - zap.Reflect("kvResp", kvResp), - zap.Error(err)) - return nil, errors.Trace(err) + for { + keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey) + if err != nil { + return nil, derr.ToTiDBErr(err) + } + kvResp, err := h.Store.SendReq(bo, tikvReq, keyLocation.Region, time.Minute) + if err != nil { + logutil.BgLogger().Info("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.Error(err)) + return nil, errors.Trace(err) + } + regionErr, err := kvResp.GetRegionError() + if err != nil { + return nil, errors.Trace(err) + } + if regionErr != nil { + if err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())); err != nil { + return nil, err + } + continue + } + mvccResp := kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse) + if errMsg := mvccResp.GetError(); errMsg != "" { + logutil.BgLogger().Info("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.String("error", errMsg)) + return nil, errors.New(errMsg) + } + return mvccResp, nil } - return kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse), nil } // MvccKV wraps the key's mvcc info in tikv. diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index b34051d3d3c9a..6fb781e367879 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -67,6 +67,12 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R } }) + failpoint.Inject("epochNotMatch", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})) + } + }) + failpoint.Inject("unistoreRPCClientSendHook", func(val failpoint.Value) { if fn := UnistoreRPCClientSendHook.Load(); val.(bool) && fn != nil { (*fn)(req)