Skip to content

Commit

Permalink
ddl: retry prepare RPC when meets region error (#39834)
Browse files Browse the repository at this point in the history
close #39836
  • Loading branch information
Defined2014 authored Dec 19, 2022
1 parent 47f5460 commit 48585a7
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 3 deletions.
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ go_library(
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
Expand Down
28 changes: 25 additions & 3 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
Expand Down Expand Up @@ -324,15 +325,36 @@ func SendPrepareFlashbackToVersionRPC(
if err != nil {
return taskStat, err
}
failpoint.Inject("mockPrepareMeetsEpochNotMatch", func(val failpoint.Value) {
if val.(bool) && bo.ErrorsNum() == 0 {
regionErr = &errorpb.Error{
Message: "stale epoch",
EpochNotMatch: &errorpb.EpochNotMatch{},
}
}
})
if regionErr != nil {
return taskStat, errors.Errorf(regionErr.String())
err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String()))
if err != nil {
return taskStat, err
}
continue
}
if resp.Resp == nil {
return taskStat, errors.Errorf("prepare flashback missing resp body")
logutil.BgLogger().Warn("prepare flashback miss resp body", zap.Uint64("region_id", loc.Region.GetID()))
err = bo.Backoff(tikv.BoTiKVRPC(), errors.New("prepare flashback rpc miss resp body"))
if err != nil {
return taskStat, err
}
continue
}
prepareFlashbackToVersionResp := resp.Resp.(*kvrpcpb.PrepareFlashbackToVersionResponse)
if err := prepareFlashbackToVersionResp.GetError(); err != "" {
return taskStat, errors.Errorf(err)
boErr := bo.Backoff(tikv.BoTiKVRPC(), errors.New(err))
if boErr != nil {
return taskStat, boErr
}
continue
}
taskStat.CompletedRegions++
if isLast {
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/brietest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_test(
deps = [
"//config",
"//ddl/util",
"//parser/model",
"//parser/mysql",
"//sessionctx/binloginfo",
"//store/mockstore/mockcopr",
Expand Down
46 changes: 46 additions & 0 deletions tests/realtikvtest/brietest/flashback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/failpoint"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -90,3 +91,48 @@ func TestFlashback(t *testing.T) {
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
}
}

func TestPrepareFlashbackFailed(t *testing.T) {
if *realtikvtest.WithRealTiKV {
store := realtikvtest.CreateMockStoreAndSetup(t)

tk := testkit.NewTestKit(t, store)

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()

tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, index i(a))")
tk.MustExec("insert t values (1), (2), (3)")

time.Sleep(1 * time.Second)

ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch", `return(true)`))

tk.MustExec("insert t values (4), (5), (6)")
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))

tk.MustExec("admin check table t")
require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3")
require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3")

jobMeta := tk.MustQuery("select job_meta from mysql.tidb_ddl_history order by job_id desc limit 1").Rows()[0][0].(string)
job := model.Job{}
require.NoError(t, job.Decode([]byte(jobMeta)))
require.Equal(t, job.ErrorCount, int64(0))

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch"))
}
}

0 comments on commit 48585a7

Please sign in to comment.