Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-kafka-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 30, 2021
2 parents 0d6eff4 + 7ea445a commit 32f52ce
Show file tree
Hide file tree
Showing 27 changed files with 4,420 additions and 1,886 deletions.
7 changes: 7 additions & 0 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ func (c *Capture) reset(ctx context.Context) error {
c.grpcService.Reset(c.MessageServer)

messageClientConfig := conf.Debug.Messages.ToMessageClientConfig()

// Puts the advertise-addr of the local node to the client config.
// This is for metrics purpose only, so that the receiver knows which
// node the connections are from.
advertiseAddr := conf.AdvertiseAddr
messageClientConfig.AdvertisedAddr = advertiseAddr

c.MessageRouter = p2p.NewMessageRouter(c.info.ID, conf.Security, messageClientConfig)
}

Expand Down
2 changes: 2 additions & 0 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/pkg/db"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -50,6 +51,7 @@ func init() {
initServerMetrics(registry)
actor.InitMetrics(registry)
orchestrator.InitMetrics(registry)
p2p.InitMetrics(registry)
// Sorter metrics
sorter.InitMetrics(registry)
memory.InitMetrics(registry)
Expand Down
3 changes: 2 additions & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ ErrSchedulerSubTaskStageInvalidUpdate,[code=46015:class=dm-master:scope=internal
ErrSchedulerSubTaskOpTaskNotExist,[code=46016:class=dm-master:scope=internal:level=medium], "Message: subtasks with name %s need to be operate not exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerSubTaskOpSourceNotExist,[code=46017:class=dm-master:scope=internal:level=medium], "Message: sources %v need to be operate not exist"
ErrSchedulerTaskNotExist,[code=46018:class=scheduler:scope=internal:level=medium], "Message: task with name %s not exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerRequireNotRunning,[code=46019:class=scheduler:scope=internal:level=high], "Message: tasks %v on source %s should not be running, Workaround: Please use `pause-task [-s source ...] task` to pause them first."
ErrSchedulerRequireRunningTaskInSyncUnit,[code=46019:class=scheduler:scope=internal:level=high], "Message: running tasks %v to be transferred on source %s should in sync unit, Workaround: Please use `pause-task [-s source ...] task` to pause them first."
ErrSchedulerRelayWorkersBusy,[code=46020:class=scheduler:scope=internal:level=high], "Message: these workers %s have started relay for sources %s respectively, Workaround: Please use `stop-relay` to stop them, or change your topology."
ErrSchedulerRelayWorkersWrongBound,[code=46021:class=scheduler:scope=internal:level=high], "Message: these workers %s have bound for another sources %s respectively, Workaround: Please `start-relay` on free or same source workers."
ErrSchedulerRelayWorkersWrongRelay,[code=46022:class=scheduler:scope=internal:level=high], "Message: these workers %s have started relay for another sources %s respectively, Workaround: Please correct sources in `stop-relay`."
Expand All @@ -532,6 +532,7 @@ ErrSchedulerStartRelayOnSpecified,[code=46028:class=scheduler:scope=internal:lev
ErrSchedulerStopRelayOnSpecified,[code=46029:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `stop-relay` without worker name now, Workaround: Please specify worker names for `stop-relay`."
ErrSchedulerStartRelayOnBound,[code=46030:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` automatically for bound worker, so it can't `start-relay` with worker name now, Workaround: Please stop relay by `stop-relay` without worker name first."
ErrSchedulerStopRelayOnBound,[code=46031:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` automatically for bound worker, so it can't `stop-relay` with worker name now, Workaround: Please use `stop-relay` without worker name."
ErrSchedulerPauseTaskForTransferSource,[code=46032:class=scheduler:scope=internal:level=low], "Message: failed to auto pause tasks %s when transfer-source, Workaround: Please pause task by `dmctl pause-task`."
ErrCtlGRPCCreateConn,[code=48001:class=dmctl:scope=internal:level=high], "Message: can not create grpc connection, Workaround: Please check your network connection."
ErrCtlInvalidTLSCfg,[code=48002:class=dmctl:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line."
ErrCtlLoadTLSCfg,[code=48003:class=dmctl:scope=internal:level=high], "Message: can not load tls config, Workaround: Please ensure that the tls certificate is accessible on the node currently running dmctl."
Expand Down
9 changes: 8 additions & 1 deletion dm/dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewConfig() *Config {

fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")
fs.BoolVar(&cfg.printSampleConfig, "print-sample-config", false, "print sample config file of dm-worker")
fs.BoolVar(&cfg.OpenAPI, "openapi", false, "enable openapi")
fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file")
fs.StringVar(&cfg.MasterAddr, "master-addr", "", "master API server and status addr")
fs.StringVar(&cfg.AdvertiseAddr, "advertise-addr", "", `advertise address for client traffic (default "${master-addr}")`)
Expand Down Expand Up @@ -91,7 +92,7 @@ func NewConfig() *Config {
}

type ExperimentalFeatures struct {
OpenAPI bool `toml:"openapi"`
OpenAPI bool `toml:"openapi,omitempty"` // OpenAPI is available in v5.4 as default.
}

// Config is the configuration for dm-master.
Expand Down Expand Up @@ -128,6 +129,7 @@ type Config struct {
AutoCompactionMode string `toml:"auto-compaction-mode" json:"auto-compaction-mode"`
AutoCompactionRetention string `toml:"auto-compaction-retention" json:"auto-compaction-retention"`
QuotaBackendBytes int64 `toml:"quota-backend-bytes" json:"quota-backend-bytes"`
OpenAPI bool `toml:"openapi" json:"openapi"`

// directory path used to store source config files when upgrading from v1.0.x.
// if this path set, DM-master leader will try to upgrade from v1.0.x to the current version.
Expand Down Expand Up @@ -313,6 +315,11 @@ func (c *Config) adjust() error {
c.QuotaBackendBytes = quotaBackendBytesLowerBound
}

if c.ExperimentalFeatures.OpenAPI {
c.OpenAPI = true
c.ExperimentalFeatures.OpenAPI = false
log.L().Warn("openapi is a GA feature and removed from experimental features, so this configuration may have no affect in feature release, please set openapi=true in dm-master config file")
}
return err
}

Expand Down
25 changes: 25 additions & 0 deletions dm/dm/master/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (t *testConfigSuite) TestConfig(c *check.C) {
c.Assert(cfg.Join, check.Equals, "")
c.Assert(cfg.String(), check.Matches, fmt.Sprintf("{.*master-addr\":\"%s\".*}", masterAddr))
c.Assert(cfg.ExperimentalFeatures.OpenAPI, check.Equals, false)
c.Assert(cfg.OpenAPI, check.Equals, false)
}
}
}
Expand Down Expand Up @@ -299,3 +300,27 @@ func (t *testConfigSuite) TestAdjustAddr(c *check.C) {
c.Assert(cfg.adjust(), check.IsNil)
c.Assert(cfg.AdvertiseAddr, check.Equals, cfg.MasterAddr)
}

func (t *testConfigSuite) TestAdjustOpenAPI(c *check.C) {
cfg := NewConfig()
c.Assert(cfg.configFromFile(defaultConfigFile), check.IsNil)
c.Assert(cfg.adjust(), check.IsNil)

// test default value
c.Assert(cfg.OpenAPI, check.Equals, false)
c.Assert(cfg.ExperimentalFeatures.OpenAPI, check.Equals, false)

// adjust openapi from experimental-features
cfg.ExperimentalFeatures.OpenAPI = true
c.Assert(cfg.adjust(), check.IsNil)
c.Assert(cfg.OpenAPI, check.Equals, true)
c.Assert(cfg.ExperimentalFeatures.OpenAPI, check.Equals, false)

// test from flags
c.Assert(cfg.Parse([]string{"--openapi=false", "--master-addr=127.0.0.1:8261"}), check.IsNil)
c.Assert(cfg.adjust(), check.IsNil)
c.Assert(cfg.OpenAPI, check.Equals, false)
c.Assert(cfg.Parse([]string{"--openapi=true", "--master-addr=127.0.0.1:8261"}), check.IsNil)
c.Assert(cfg.adjust(), check.IsNil)
c.Assert(cfg.OpenAPI, check.Equals, true)
}
3 changes: 1 addition & 2 deletions dm/dm/master/dm-master.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,5 @@ rpc-timeout = "30s"
rpc-rate-burst = 40
rpc-rate-limit = 10.0

# some experimental features
[experimental]
# openapi feature
openapi = false
2 changes: 1 addition & 1 deletion dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (s *Server) DMAPITransferSource(c *gin.Context, sourceName string) {
_ = c.Error(err)
return
}
if err := s.scheduler.TransferSource(sourceName, req.WorkerName); err != nil {
if err := s.scheduler.TransferSource(c.Request.Context(), sourceName, req.WorkerName); err != nil {
_ = c.Error(err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
cfg1.PeerUrls = tempurl.Alloc()
cfg1.AdvertisePeerUrls = cfg1.PeerUrls
cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls)
cfg1.ExperimentalFeatures.OpenAPI = true
cfg1.OpenAPI = true

s1 := NewServer(cfg1)
c.Assert(s1.Start(ctx), check.IsNil)
Expand All @@ -108,7 +108,7 @@ func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
cfg2.PeerUrls = tempurl.Alloc()
cfg2.AdvertisePeerUrls = cfg2.PeerUrls
cfg2.Join = cfg1.MasterAddr // join to an existing cluster
cfg2.ExperimentalFeatures.OpenAPI = true
cfg2.OpenAPI = true

s2 := NewServer(cfg2)
c.Assert(s2.Start(ctx), check.IsNil)
Expand Down Expand Up @@ -778,7 +778,7 @@ func setupServer(ctx context.Context, c *check.C) *Server {
cfg1.AdvertisePeerUrls = cfg1.PeerUrls
cfg1.AdvertiseAddr = cfg1.MasterAddr
cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls)
cfg1.ExperimentalFeatures.OpenAPI = true
cfg1.OpenAPI = true

s1 := NewServer(cfg1)
c.Assert(s1.Start(ctx), check.IsNil)
Expand Down
Loading

0 comments on commit 32f52ce

Please sign in to comment.