Skip to content

Commit

Permalink
*: make flow-round-by-digit compatible with trace-region-flow (tikv#3747
Browse files Browse the repository at this point in the history
)

* *: make flow-round-by-digit compatible with trace-region-flow

Signed-off-by: nolouch <[email protected]>

* fix test

Signed-off-by: nolouch <[email protected]>

* fix test

Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch authored and bufferflies committed Jul 9, 2021
1 parent 7554b71 commit ad1dfed
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 12 deletions.
4 changes: 2 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if c.traceRegionFlow && (region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead()) {
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead() {
saveCache, needSync = true, true
}

Expand Down
7 changes: 0 additions & 7 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,13 +622,6 @@ func (s *testClusterInfoSuite) TestRegionFlowChanged(c *C) {
processRegions(regions)
newRegion := cluster.GetRegion(region.GetID())
c.Assert(newRegion.GetBytesRead(), Equals, uint64(1000))

// do not trace the flow changes
cluster.traceRegionFlow = false
processRegions([]*core.RegionInfo{region})
newRegion = cluster.GetRegion(region.GetID())
c.Assert(region.GetBytesRead(), Equals, uint64(0))
c.Assert(newRegion.GetBytesRead(), Not(Equals), uint64(0))
}

func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) {
Expand Down
33 changes: 30 additions & 3 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"flag"
"fmt"
"math"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -1076,9 +1077,9 @@ type PDServerConfig struct {
MetricStorage string `toml:"metric-storage" json:"metric-storage"`
// There are some values supported: "auto", "none", or a specific address, default: "auto"
DashboardAddress string `toml:"dashboard-address" json:"dashboard-address"`
// TraceRegionFlow the option to update flow information of regions
// TODO: deprecate
TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string"`
// TraceRegionFlow the option to update flow information of regions.
// WARN: TraceRegionFlow is deprecated.
TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string,omitempty"`
// FlowRoundByDigit used to discretization processing flow information.
FlowRoundByDigit int `toml:"flow-round-by-digit" json:"flow-round-by-digit"`
}
Expand All @@ -1103,9 +1104,35 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error {
if !meta.IsDefined("flow-round-by-digit") {
adjustInt(&c.FlowRoundByDigit, defaultFlowRoundByDigit)
}
c.migrateConfigurationFromFile(meta)
return c.Validate()
}

func (c *PDServerConfig) migrateConfigurationFromFile(meta *configMetaData) error {
oldName, newName := "trace-region-flow", "flow-round-by-digit"
defineOld, defineNew := meta.IsDefined(oldName), meta.IsDefined(newName)
switch {
case defineOld && defineNew:
if c.TraceRegionFlow && (c.FlowRoundByDigit == defaultFlowRoundByDigit) {
return errors.Errorf("config item %s and %s(deprecated) are conflict", newName, oldName)
}
case defineOld && !defineNew:
if !c.TraceRegionFlow {
c.FlowRoundByDigit = math.MaxInt8
}
}
return nil
}

// MigrateDeprecatedFlags updates new flags according to deprecated flags.
func (c *PDServerConfig) MigrateDeprecatedFlags() {
if !c.TraceRegionFlow {
c.FlowRoundByDigit = math.MaxInt8
}
// json omity the false. next time will not persist to the kv.
c.TraceRegionFlow = false
}

// Clone returns a cloned PD server config.
func (c *PDServerConfig) Clone() *PDServerConfig {
runtimeServices := append(c.RuntimeServices[:0:0], c.RuntimeServices...)
Expand Down
4 changes: 4 additions & 0 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config
import (
"encoding/json"
"fmt"
"math"
"os"
"path"
"strings"
Expand Down Expand Up @@ -284,13 +285,16 @@ func (s *testConfigSuite) TestMigrateFlags(c *C) {
return cfg, err
}
cfg, err := load(`
[pd-server]
trace-region-flow = false
[schedule]
disable-remove-down-replica = true
enable-make-up-replica = false
disable-remove-extra-replica = true
enable-remove-extra-replica = false
`)
c.Assert(err, IsNil)
c.Assert(cfg.PDServerCfg.FlowRoundByDigit, Equals, math.MaxInt8)
c.Assert(cfg.Schedule.EnableReplaceOfflineReplica, IsTrue)
c.Assert(cfg.Schedule.EnableRemoveDownReplica, IsFalse)
c.Assert(cfg.Schedule.EnableMakeUpReplica, IsFalse)
Expand Down
1 change: 1 addition & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ func (o *PersistOptions) Reload(storage *core.Storage) error {
return err
}
o.adjustScheduleCfg(&cfg.Schedule)
cfg.PDServerCfg.MigrateDeprecatedFlags()
if isExist {
o.schedule.Store(&cfg.Schedule)
o.replication.Store(&cfg.Replication)
Expand Down

0 comments on commit ad1dfed

Please sign in to comment.