Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

master(dm): add cluster id as unique identification of dm-cluster and optimize error message of code:30008 #4493

Merged
merged 15 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ ErrRelayBinlogNameNotValid,[code=30004:class=relay-unit:scope=internal:level=hig
ErrRelayNoCurrentUUID,[code=30005:class=relay-unit:scope=internal:level=high], "Message: no current UUID set"
ErrRelayFlushLocalMeta,[code=30006:class=relay-unit:scope=internal:level=high], "Message: flush local meta"
ErrRelayUpdateIndexFile,[code=30007:class=relay-unit:scope=internal:level=high], "Message: update UUID index file %s"
ErrRelayLogDirpathEmpty,[code=30008:class=relay-unit:scope=internal:level=high], "Message: dirpath is empty, Workaround: Please check the `relay-dir` config in source config file."
ErrRelayLogDirpathEmpty,[code=30008:class=relay-unit:scope=internal:level=high], "Message: dirpath is empty, Workaround: Please check the `relay-dir` config in source config file or dm-worker config file."
ErrRelayReaderNotStateNew,[code=30009:class=relay-unit:scope=internal:level=high], "Message: stage %s, expect %s, already started"
ErrRelayReaderStateCannotClose,[code=30010:class=relay-unit:scope=internal:level=high], "Message: stage %s, expect %s, can not close"
ErrRelayReaderNeedStart,[code=30011:class=relay-unit:scope=internal:level=high], "Message: stage %s, expect %s"
Expand Down Expand Up @@ -396,6 +396,7 @@ ErrMasterFailToImportFromV10x,[code=38053:class=dm-master:scope=internal:level=h
ErrMasterInconsistentOptimisticDDLsAndInfo,[code=38054:class=dm-master:scope=internal:level=high], "Message: inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d"
ErrMasterOptimisticTableInfoBeforeNotExist,[code=38055:class=dm-master:scope=internal:level=high], "Message: table-info-before not exist in optimistic ddls: %v"
ErrMasterOptimisticDownstreamMetaNotFound,[code=38056:class=dm-master:scope=internal:level=high], "Message: downstream database config and meta for task %s not found"
ErrMasterInvalidClusterID,[code=38057:class=dm-master:scope=internal:level=high], "Message: invalid cluster id: %v"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down
3 changes: 3 additions & 0 deletions dm/dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ var (
useOfClosedErrMsg = "use of closed network connection"
// ClusterVersionKey is used to store the version of the cluster.
ClusterVersionKey = "/dm-cluster/version"
// ClusterIDKey is used to store the cluster id of the whole dm cluster. Cluster id is the unique identification of dm cluster
// After leader of dm master bootstraped, the leader will get the id from etcd or generate fresh one, and backfill to etcd.
ClusterIDKey = "/dm-cluster/id"
// WorkerRegisterKeyAdapter is used to encode and decode register key.
// k/v: Encode(worker-name) -> the information of the DM-worker node.
WorkerRegisterKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-worker/r/")
Expand Down
6 changes: 6 additions & 0 deletions dm/dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ func (s *Server) startLeaderComponent(ctx context.Context) bool {
return false
}

err = s.initClusterID(ctx)
if err != nil {
log.L().Error("init cluster id failed", zap.Error(err))
return false
}

failpoint.Inject("FailToStartLeader", func(val failpoint.Value) {
masterStrings := val.(string)
if strings.Contains(masterStrings, s.cfg.Name) {
Expand Down
4 changes: 4 additions & 0 deletions dm/dm/master/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) {
_, leaderID, _, err := s2.election.LeaderInfo(ctx)
c.Assert(err, check.IsNil)
c.Assert(leaderID, check.Equals, cfg1.Name)
c.Assert(s1.ClusterID(), check.Greater, uint64(0))
c.Assert(s2.ClusterID(), check.Equals, uint64(0))

// fail to start scheduler/pessimism/optimism
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/FailToStartLeader", `return("dm-master-2")`), check.IsNil)
Expand All @@ -89,6 +91,7 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) {
_, leaderID, _, err = s2.election.LeaderInfo(ctx)
c.Assert(err, check.IsNil)
c.Assert(leaderID, check.Equals, cfg1.Name)
clusterID := s1.ClusterID()

//nolint:errcheck
failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/FailToStartLeader")
Expand All @@ -99,6 +102,7 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) {
_, leaderID, _, err = s2.election.LeaderInfo(ctx)
c.Assert(err, check.IsNil)
c.Assert(leaderID, check.Equals, cfg2.Name)
c.Assert(clusterID, check.Equals, s2.ClusterID())

cancel()
}
7 changes: 7 additions & 0 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,13 @@ func (s *Server) DMAPUpdateTaskTemplate(c *gin.Context, taskName string) {
c.IndentedJSON(http.StatusOK, task)
}

// DMAPIGetClusterInfo return cluster id of dm cluster.
func (s *Server) DMAPIGetClusterInfo(c *gin.Context) {
r := &openapi.GetClusterInfoResponse{}
r.ClusterId = s.ClusterID()
c.IndentedJSON(http.StatusOK, r)
}

func terrorHTTPErrorHandler() gin.HandlerFunc {
return func(c *gin.Context) {
c.Next()
Expand Down
9 changes: 9 additions & 0 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,15 @@ func (t *openAPISuite) TestClusterAPI(c *check.C) {
c.Assert(resultMasters.Data[0].Leader, check.IsTrue)
c.Assert(resultMasters.Data[0].Alive, check.IsTrue)

// check cluster id
clusterIDURL := baseURL + "info"
resp := testutil.NewRequest().Get(clusterIDURL).GoWithHTTPHandler(t.testT, s1.openapiHandles)
c.Assert(resp.Code(), check.Equals, http.StatusOK)
var clusterIDResp openapi.GetClusterInfoResponse
err = resp.UnmarshalBodyToObject(&clusterIDResp)
c.Assert(err, check.IsNil)
c.Assert(clusterIDResp.ClusterId, check.Greater, uint64(0))

// offline master-2 with retry
// operate etcd cluster may met `etcdserver: unhealthy cluster`, add some retry
for i := 0; i < 20; i++ {
Expand Down
43 changes: 43 additions & 0 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package master

import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"net"
"net/http"
"reflect"
Expand Down Expand Up @@ -121,6 +123,8 @@ type Server struct {
closed atomic.Bool

openapiHandles *gin.Engine // injected in `InitOpenAPIHandles`

clusterID atomic.Uint64
}

// NewServer creates a new Server.
Expand Down Expand Up @@ -405,6 +409,45 @@ func subtaskCfgPointersToInstances(stCfgPointers ...*config.SubTaskConfig) []con
return stCfgs
}

func (s *Server) initClusterID(ctx context.Context) error {
log.L().Info("init cluster id begin")
ctx1, cancel := context.WithTimeout(ctx, etcdutil.DefaultRequestTimeout)
defer cancel()

resp, err := s.etcdClient.Get(ctx1, dmcommon.ClusterIDKey)
if err != nil {
return err
}

// New cluster, generate a cluster id and backfill it to etcd
if len(resp.Kvs) == 0 {
ts := uint64(time.Now().Unix())
clusterID := (ts << 32) + uint64(rand.Uint32())
clusterIDBytes := make([]byte, 8)
binary.BigEndian.PutUint64(clusterIDBytes, clusterID)
_, err = s.etcdClient.Put(ctx1, dmcommon.ClusterIDKey, string(clusterIDBytes))
if err != nil {
return err
}
s.clusterID.Store(clusterID)
log.L().Info("generate and init cluster id success", zap.Uint64("cluster_id", s.clusterID.Load()))
return nil
}

if len(resp.Kvs[0].Value) != 8 {
return terror.ErrMasterInvalidClusterID.Generate(resp.Kvs[0].Value)
}

s.clusterID.Store(binary.BigEndian.Uint64(resp.Kvs[0].Value))
log.L().Info("init cluster id success", zap.Uint64("cluster_id", s.clusterID.Load()))
return nil
}

// ClusterID return correct cluster id when as leader.
func (s *Server) ClusterID() uint64 {
return s.clusterID.Load()
}

// StartTask implements MasterServer.StartTask.
func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.StartTaskResponse, error) {
var (
Expand Down
8 changes: 7 additions & 1 deletion dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1285,7 +1285,7 @@ tags = ["internal", "high"]
[error.DM-relay-unit-30008]
message = "dirpath is empty"
description = ""
workaround = "Please check the `relay-dir` config in source config file."
workaround = "Please check the `relay-dir` config in source config file or dm-worker config file."
tags = ["internal", "high"]

[error.DM-relay-unit-30009]
Expand Down Expand Up @@ -2386,6 +2386,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-master-38057]
message = "invalid cluster id: %v"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-worker-40001]
message = "parse dm-worker config flag set"
description = ""
Expand Down
101 changes: 101 additions & 0 deletions dm/openapi/gen.client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading