Skip to content

Commit

Permalink
master(dm): add cluster id as unique identification of dm-cluster and…
Browse files Browse the repository at this point in the history
… optimize error message of code:30008 (#4493)

close #3830, close #4370, close #4456
  • Loading branch information
niubell authored Feb 8, 2022
1 parent 5459951 commit da0dda8
Show file tree
Hide file tree
Showing 14 changed files with 317 additions and 88 deletions.
3 changes: 2 additions & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ ErrRelayBinlogNameNotValid,[code=30004:class=relay-unit:scope=internal:level=hig
ErrRelayNoCurrentUUID,[code=30005:class=relay-unit:scope=internal:level=high], "Message: no current UUID set"
ErrRelayFlushLocalMeta,[code=30006:class=relay-unit:scope=internal:level=high], "Message: flush local meta"
ErrRelayUpdateIndexFile,[code=30007:class=relay-unit:scope=internal:level=high], "Message: update UUID index file %s"
ErrRelayLogDirpathEmpty,[code=30008:class=relay-unit:scope=internal:level=high], "Message: dirpath is empty, Workaround: Please check the `relay-dir` config in source config file."
ErrRelayLogDirpathEmpty,[code=30008:class=relay-unit:scope=internal:level=high], "Message: dirpath is empty, Workaround: Please check the `relay-dir` config in source config file or dm-worker config file."
ErrRelayReaderNotStateNew,[code=30009:class=relay-unit:scope=internal:level=high], "Message: stage %s, expect %s, already started"
ErrRelayReaderStateCannotClose,[code=30010:class=relay-unit:scope=internal:level=high], "Message: stage %s, expect %s, can not close"
ErrRelayReaderNeedStart,[code=30011:class=relay-unit:scope=internal:level=high], "Message: stage %s, expect %s"
Expand Down Expand Up @@ -396,6 +396,7 @@ ErrMasterFailToImportFromV10x,[code=38053:class=dm-master:scope=internal:level=h
ErrMasterInconsistentOptimisticDDLsAndInfo,[code=38054:class=dm-master:scope=internal:level=high], "Message: inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d"
ErrMasterOptimisticTableInfoBeforeNotExist,[code=38055:class=dm-master:scope=internal:level=high], "Message: table-info-before not exist in optimistic ddls: %v"
ErrMasterOptimisticDownstreamMetaNotFound,[code=38056:class=dm-master:scope=internal:level=high], "Message: downstream database config and meta for task %s not found"
ErrMasterInvalidClusterID,[code=38057:class=dm-master:scope=internal:level=high], "Message: invalid cluster id: %v"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down
3 changes: 3 additions & 0 deletions dm/dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ var (
useOfClosedErrMsg = "use of closed network connection"
// ClusterVersionKey is used to store the version of the cluster.
ClusterVersionKey = "/dm-cluster/version"
// ClusterIDKey is used to store the cluster id of the whole dm cluster. Cluster id is the unique identification of dm cluster
// After leader of dm master bootstraped, the leader will get the id from etcd or generate fresh one, and backfill to etcd.
ClusterIDKey = "/dm-cluster/id"
// WorkerRegisterKeyAdapter is used to encode and decode register key.
// k/v: Encode(worker-name) -> the information of the DM-worker node.
WorkerRegisterKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-worker/r/")
Expand Down
6 changes: 6 additions & 0 deletions dm/dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ func (s *Server) startLeaderComponent(ctx context.Context) bool {
return false
}

err = s.initClusterID(ctx)
if err != nil {
log.L().Error("init cluster id failed", zap.Error(err))
return false
}

failpoint.Inject("FailToStartLeader", func(val failpoint.Value) {
masterStrings := val.(string)
if strings.Contains(masterStrings, s.cfg.Name) {
Expand Down
4 changes: 4 additions & 0 deletions dm/dm/master/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) {
_, leaderID, _, err := s2.election.LeaderInfo(ctx)
c.Assert(err, check.IsNil)
c.Assert(leaderID, check.Equals, cfg1.Name)
c.Assert(s1.ClusterID(), check.Greater, uint64(0))
c.Assert(s2.ClusterID(), check.Equals, uint64(0))

// fail to start scheduler/pessimism/optimism
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/FailToStartLeader", `return("dm-master-2")`), check.IsNil)
Expand All @@ -89,6 +91,7 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) {
_, leaderID, _, err = s2.election.LeaderInfo(ctx)
c.Assert(err, check.IsNil)
c.Assert(leaderID, check.Equals, cfg1.Name)
clusterID := s1.ClusterID()

//nolint:errcheck
failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/FailToStartLeader")
Expand All @@ -99,6 +102,7 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) {
_, leaderID, _, err = s2.election.LeaderInfo(ctx)
c.Assert(err, check.IsNil)
c.Assert(leaderID, check.Equals, cfg2.Name)
c.Assert(clusterID, check.Equals, s2.ClusterID())

cancel()
}
7 changes: 7 additions & 0 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,13 @@ func (s *Server) DMAPUpdateTaskTemplate(c *gin.Context, taskName string) {
c.IndentedJSON(http.StatusOK, task)
}

// DMAPIGetClusterInfo return cluster id of dm cluster.
func (s *Server) DMAPIGetClusterInfo(c *gin.Context) {
r := &openapi.GetClusterInfoResponse{}
r.ClusterId = s.ClusterID()
c.IndentedJSON(http.StatusOK, r)
}

func terrorHTTPErrorHandler() gin.HandlerFunc {
return func(c *gin.Context) {
c.Next()
Expand Down
9 changes: 9 additions & 0 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,15 @@ func (t *openAPISuite) TestClusterAPI(c *check.C) {
c.Assert(resultMasters.Data[0].Leader, check.IsTrue)
c.Assert(resultMasters.Data[0].Alive, check.IsTrue)

// check cluster id
clusterIDURL := baseURL + "info"
resp := testutil.NewRequest().Get(clusterIDURL).GoWithHTTPHandler(t.testT, s1.openapiHandles)
c.Assert(resp.Code(), check.Equals, http.StatusOK)
var clusterIDResp openapi.GetClusterInfoResponse
err = resp.UnmarshalBodyToObject(&clusterIDResp)
c.Assert(err, check.IsNil)
c.Assert(clusterIDResp.ClusterId, check.Greater, uint64(0))

// offline master-2 with retry
// operate etcd cluster may met `etcdserver: unhealthy cluster`, add some retry
for i := 0; i < 20; i++ {
Expand Down
43 changes: 43 additions & 0 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package master

import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"net"
"net/http"
"reflect"
Expand Down Expand Up @@ -121,6 +123,8 @@ type Server struct {
closed atomic.Bool

openapiHandles *gin.Engine // injected in `InitOpenAPIHandles`

clusterID atomic.Uint64
}

// NewServer creates a new Server.
Expand Down Expand Up @@ -405,6 +409,45 @@ func subtaskCfgPointersToInstances(stCfgPointers ...*config.SubTaskConfig) []con
return stCfgs
}

func (s *Server) initClusterID(ctx context.Context) error {
log.L().Info("init cluster id begin")
ctx1, cancel := context.WithTimeout(ctx, etcdutil.DefaultRequestTimeout)
defer cancel()

resp, err := s.etcdClient.Get(ctx1, dmcommon.ClusterIDKey)
if err != nil {
return err
}

// New cluster, generate a cluster id and backfill it to etcd
if len(resp.Kvs) == 0 {
ts := uint64(time.Now().Unix())
clusterID := (ts << 32) + uint64(rand.Uint32())
clusterIDBytes := make([]byte, 8)
binary.BigEndian.PutUint64(clusterIDBytes, clusterID)
_, err = s.etcdClient.Put(ctx1, dmcommon.ClusterIDKey, string(clusterIDBytes))
if err != nil {
return err
}
s.clusterID.Store(clusterID)
log.L().Info("generate and init cluster id success", zap.Uint64("cluster_id", s.clusterID.Load()))
return nil
}

if len(resp.Kvs[0].Value) != 8 {
return terror.ErrMasterInvalidClusterID.Generate(resp.Kvs[0].Value)
}

s.clusterID.Store(binary.BigEndian.Uint64(resp.Kvs[0].Value))
log.L().Info("init cluster id success", zap.Uint64("cluster_id", s.clusterID.Load()))
return nil
}

// ClusterID return correct cluster id when as leader.
func (s *Server) ClusterID() uint64 {
return s.clusterID.Load()
}

// StartTask implements MasterServer.StartTask.
func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.StartTaskResponse, error) {
var (
Expand Down
8 changes: 7 additions & 1 deletion dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1285,7 +1285,7 @@ tags = ["internal", "high"]
[error.DM-relay-unit-30008]
message = "dirpath is empty"
description = ""
workaround = "Please check the `relay-dir` config in source config file."
workaround = "Please check the `relay-dir` config in source config file or dm-worker config file."
tags = ["internal", "high"]

[error.DM-relay-unit-30009]
Expand Down Expand Up @@ -2386,6 +2386,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-master-38057]
message = "invalid cluster id: %v"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-worker-40001]
message = "parse dm-worker config flag set"
description = ""
Expand Down
101 changes: 101 additions & 0 deletions dm/openapi/gen.client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit da0dda8

Please sign in to comment.