Skip to content

Commit

Permalink
autoid_service,meta/autoid: distinguish retriable error from non-retr…
Browse files Browse the repository at this point in the history
…iable error (#39215)

close #39214
  • Loading branch information
tiancaiamao authored Nov 30, 2022
1 parent a48fe79 commit a00f0fe
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 22 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2915,8 +2915,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw=",
version = "v0.0.0-20221129023506-621ec37aac7a",
sum = "h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=",
version = "v0.0.0-20221130022225-6c56ac56fe5f",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down
1 change: 1 addition & 0 deletions autoid_service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//util/logutil",
"//util/mathutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/autoid",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
Expand Down
26 changes: 22 additions & 4 deletions autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/autoid"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -401,9 +402,16 @@ func (s *Service) getAlloc(dbID, tblID int64, isUnsigned bool) *autoIDValue {

func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (*autoid.AutoIDResponse, error) {
if s.leaderShip != nil && !s.leaderShip.IsOwner() {
logutil.BgLogger().Info("[autoid service] Alloc AutoID fail, not leader")
return nil, errors.New("not leader")
}

failpoint.Inject("mockErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("mock reload failed"))
}
})

val := s.getAlloc(req.DbID, req.TblID, req.IsUnsigned)

if req.N == 0 {
Expand All @@ -426,10 +434,13 @@ func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (*
val.end = currentEnd
return nil
})
if err != nil {
return &autoid.AutoIDResponse{Errmsg: []byte(err.Error())}, nil
}
return &autoid.AutoIDResponse{
Min: currentEnd,
Max: currentEnd,
}, err
}, nil
}

val.Lock()
Expand All @@ -443,10 +454,13 @@ func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (*
min, max, err = val.alloc4Signed(ctx, s.store, req.DbID, req.TblID, req.IsUnsigned, req.N, req.Increment, req.Offset)
}

if err != nil {
return &autoid.AutoIDResponse{Errmsg: []byte(err.Error())}, nil
}
return &autoid.AutoIDResponse{
Min: min,
Max: max,
}, err
}, nil
}

func (alloc *autoIDValue) forceRebase(ctx context.Context, store kv.Storage, dbID, tblID, requiredBase int64, isUnsigned bool) error {
Expand Down Expand Up @@ -478,14 +492,15 @@ func (alloc *autoIDValue) forceRebase(ctx context.Context, store kv.Storage, dbI
// req.N = 0 is handled specially, it is used to return the current auto ID value.
func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoid.RebaseResponse, error) {
if s.leaderShip != nil && !s.leaderShip.IsOwner() {
logutil.BgLogger().Info("[autoid service] Rebase() fail, not leader")
return nil, errors.New("not leader")
}

val := s.getAlloc(req.DbID, req.TblID, req.IsUnsigned)
if req.Force {
err := val.forceRebase(ctx, s.store, req.DbID, req.TblID, req.Base, req.IsUnsigned)
if err != nil {
return nil, errors.Trace(err)
return &autoid.RebaseResponse{Errmsg: []byte(err.Error())}, nil
}
}

Expand All @@ -495,5 +510,8 @@ func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoi
} else {
err = val.rebase4Signed(ctx, s.store, req.DbID, req.TblID, req.Base)
}
return &autoid.RebaseResponse{}, err
if err != nil {
return &autoid.RebaseResponse{Errmsg: []byte(err.Error())}, nil
}
return &autoid.RebaseResponse{}, nil
}
1 change: 1 addition & 0 deletions executor/autoidtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_test(
"//sessionctx/variable",
"//testkit",
"//testkit/testutil",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
Expand Down
13 changes: 13 additions & 0 deletions executor/autoidtest/autoid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"testing"

"github.com/pingcap/failpoint"
ddltestutil "github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -736,3 +737,15 @@ func TestAlterTableAutoIDCache(t *testing.T) {
_, err = tk.Exec("alter table t_473 auto_id_cache = 1")
require.Error(t, err)
}

func TestMockAutoIDServiceError(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("USE test;")
tk.MustExec("create table t_mock_err (id int key auto_increment) auto_id_cache 1")

failpoint.Enable("github.com/pingcap/tidb/autoid_service/mockErr", `return(true)`)
defer failpoint.Disable("github.com/pingcap/tidb/autoid_service/mockErr")
// Cover a bug that the autoid client retry non-retryable errors forever cause dead loop.
tk.MustExecToErr("insert into t_mock_err values (),()") // mock error, instead of dead loop
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f
github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down
3 changes: 2 additions & 1 deletion meta/autoid/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//autoid_service",
"//config",
"//errno",
"//kv",
"//meta",
Expand All @@ -31,7 +32,7 @@ go_library(
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//credentials",
"@org_uber_go_zap//:zap",
],
)
Expand Down
40 changes: 29 additions & 11 deletions meta/autoid/autoid_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/autoid"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials"
)

var _ Allocator = &singlePointAlloc{}
Expand Down Expand Up @@ -77,13 +78,23 @@ func (d *clientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClien
if err != nil {
return nil, errors.Trace(err)
}

if len(resp.Kvs) == 0 {
return nil, errors.New("autoid service leader not found")
}

addr := string(resp.Kvs[0].Value)
grpcConn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
opt := grpc.WithInsecure()
security := config.GetGlobalConfig().Security
if len(security.ClusterSSLCA) != 0 {
clusterSecurity := security.ClusterSecurity()
tlsConfig, err := clusterSecurity.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
logutil.BgLogger().Info("[autoid client] connect to leader", zap.String("addr", addr))
grpcConn, err := grpc.Dial(addr, opt)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -128,11 +139,14 @@ retry:
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
time.Sleep(backoffDuration)
sp.resetConn()
sp.resetConn(err)
goto retry
}
return 0, 0, errors.Trace(err)
}
if len(resp.Errmsg) != 0 {
return 0, 0, errors.Trace(errors.New(string(resp.Errmsg)))
}

du := time.Since(start)
metrics.AutoIDReqDuration.Observe(du.Seconds())
Expand All @@ -142,7 +156,9 @@ retry:

const backoffDuration = 200 * time.Millisecond

func (sp *singlePointAlloc) resetConn() {
func (sp *singlePointAlloc) resetConn(reason error) {
logutil.BgLogger().Info("[autoid client] reset grpc connection",
zap.String("reason", reason.Error()))
var grpcConn *grpc.ClientConn
sp.mu.Lock()
grpcConn = sp.mu.ClientConn
Expand All @@ -153,9 +169,7 @@ func (sp *singlePointAlloc) resetConn() {
if grpcConn != nil {
err := grpcConn.Close()
if err != nil {
logutil.BgLogger().Warn("[autoid client] AllocAutoID grpc error, reconnect", zap.Error(err))
} else {
logutil.BgLogger().Info("[autoid client] AllocAutoID grpc error, reconnect")
logutil.BgLogger().Warn("[autoid client] close grpc connection error", zap.Error(err))
}
}
}
Expand Down Expand Up @@ -186,7 +200,8 @@ retry:
if err != nil {
return errors.Trace(err)
}
_, err = cli.Rebase(ctx, &autoid.RebaseRequest{
var resp *autoid.RebaseResponse
resp, err = cli.Rebase(ctx, &autoid.RebaseRequest{
DbID: sp.dbID,
TblID: sp.tblID,
Base: newBase,
Expand All @@ -196,13 +211,16 @@ retry:
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
time.Sleep(backoffDuration)
sp.resetConn()
sp.resetConn(err)
goto retry
}
return errors.Trace(err)
}
if len(resp.Errmsg) != 0 {
return errors.Trace(errors.New(string(resp.Errmsg)))
}
sp.lastAllocated = newBase
return err
return nil
}

// ForceRebase set the next global auto ID to newBase.
Expand Down
4 changes: 3 additions & 1 deletion server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,9 @@ func (s *Server) startStatusServerAndRPCServer(serverMux *http.ServeMux) {
logutil.BgLogger().Error("tikv store not etcd background", zap.Error(err))
break
}
service := autoid.New(s.statusListener.Addr().String(), etcdAddr, store, ebd.TLSConfig())
selfAddr := s.statusListener.Addr().String()
service := autoid.New(selfAddr, etcdAddr, store, ebd.TLSConfig())
logutil.BgLogger().Info("register auto service at", zap.String("addr", selfAddr))
pb.RegisterAutoIDAllocServer(grpcServer, service)
s.autoIDService = service
break
Expand Down

0 comments on commit a00f0fe

Please sign in to comment.