Skip to content

Commit

Permalink
*: Setup Global Resource Controller (#40732)
Browse files Browse the repository at this point in the history
close #40731
  • Loading branch information
nolouch authored Jan 20, 2023
1 parent c8dc908 commit 5b82653
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 9 deletions.
12 changes: 12 additions & 0 deletions domain/domain_sysvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func (do *Domain) initDomainSysVars() {

variable.SetExternalTimestamp = do.setExternalTimestamp
variable.GetExternalTimestamp = do.getExternalTimestamp

setGlobalResourceControlFunc := do.setGlobalResourceControl
variable.SetGlobalResourceControl.Store(&setGlobalResourceControlFunc)
}

// setStatsCacheCapacity sets statsCache cap
Expand Down Expand Up @@ -67,6 +70,15 @@ func (do *Domain) setPDClientDynamicOption(name, sVal string) {
}
}

func (do *Domain) setGlobalResourceControl(enable bool) {
if enable {
variable.EnableGlobalResourceControlFunc()
} else {
variable.DisableGlobalResourceControlFunc()
}
logutil.BgLogger().Info("set resource control", zap.Bool("enable", enable))
}

// updatePDClient is used to set the dynamic option into the PD client.
func (do *Domain) updatePDClient(option pd.DynamicOption, val interface{}) error {
store, ok := do.store.(interface{ GetPDClient() pd.Client })
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ require (
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.5-0.20230120021435-f89383775234
github.com/tikv/pd v1.1.0-beta.0.20230119114149-402c2bfee2f3
github.com/tikv/pd/client v0.0.0-20230119115149-5c518d079b93
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down Expand Up @@ -225,7 +226,6 @@ require (
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd v1.1.0-beta.0.20230119114149-402c2bfee2f3 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
Expand Down
4 changes: 2 additions & 2 deletions resourcemanager/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/pingcap/tidb/util/cpu"
)

// GlobalResourceManager is a global resource manager
var GlobalResourceManager = NewResourceManger()
// InstanceResourceManager is a local instance resource manager
var InstanceResourceManager = NewResourceManger()

// RandomName is to get a random name for register pool. It is just for test.
func RandomName() string {
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,7 @@ var defaultSysVars = []*SysVar{
},
{Scope: ScopeGlobal, Name: TiDBEnableResourceControl, Value: BoolToOnOff(DefTiDBEnableResourceControl), Type: TypeBool, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
EnableResourceControl.Store(TiDBOptOn(s))
(*SetGlobalResourceControl.Load())(TiDBOptOn(s))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return BoolToOnOff(EnableResourceControl.Load()), nil
Expand Down
19 changes: 18 additions & 1 deletion sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,26 +1115,43 @@ func TestSetJobScheduleWindow(t *testing.T) {
}

func TestTiDBEnableResourceControl(t *testing.T) {
// setup the hooks for test
enable := false
EnableGlobalResourceControlFunc = func() { enable = true }
DisableGlobalResourceControlFunc = func() { enable = false }
setGlobalResourceControlFunc := func(enable bool) {
if enable {
EnableGlobalResourceControlFunc()
} else {
DisableGlobalResourceControlFunc()
}
}
SetGlobalResourceControl.Store(&setGlobalResourceControlFunc)

vars := NewSessionVars(nil)
mock := NewMockGlobalAccessor4Tests()
mock.SessionVars = vars
vars.GlobalVarsAccessor = mock
resourceControlEnabled := GetSysVar(TiDBEnableResourceControl)

// Default true
// Default false
require.Equal(t, resourceControlEnabled.Value, Off)
require.Equal(t, enable, false)

// Set to On
err := mock.SetGlobalSysVar(context.Background(), TiDBEnableResourceControl, On)

require.NoError(t, err)
val, err1 := mock.GetGlobalSysVar(TiDBEnableResourceControl)
require.NoError(t, err1)
require.Equal(t, On, val)
require.Equal(t, enable, true)

// Set to off
err = mock.SetGlobalSysVar(context.Background(), TiDBEnableResourceControl, Off)
require.NoError(t, err)
val, err1 = mock.GetGlobalSysVar(TiDBEnableResourceControl)
require.NoError(t, err1)
require.Equal(t, Off, val)
require.Equal(t, enable, false)
}
10 changes: 10 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,16 @@ var (
SetExternalTimestamp func(ctx context.Context, ts uint64) error
// GetExternalTimestamp is the func registered by staleread to get externaltimestamp from pd
GetExternalTimestamp func(ctx context.Context) (uint64, error)
// SetGlobalResourceControl is the func registered by domain to set cluster resource control.
SetGlobalResourceControl atomic.Pointer[func(bool)]
)

// Hooks functions for Cluster Resource Control.
var (
// EnableGlobalResourceControlFunc is the function registered by tikv_driver to set cluster resource control.
EnableGlobalResourceControlFunc func() = func() {}
// DisableGlobalResourceControlFunc is the function registered by tikv_driver to unset cluster resource control.
DisableGlobalResourceControlFunc func() = func() {}
)

func serverMemoryLimitDefaultValue() string {
Expand Down
2 changes: 2 additions & 0 deletions store/driver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//kv",
"//sessionctx/variable",
"//store/copr",
"//store/driver/error",
"//store/driver/txn",
Expand All @@ -19,6 +20,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd//pkg/mcs/resource_manager/client",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
Expand Down
25 changes: 25 additions & 0 deletions store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/copr"
derr "github.com/pingcap/tidb/store/driver/error"
txn_driver "github.com/pingcap/tidb/store/driver/txn"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
rmclient "github.com/tikv/pd/pkg/mcs/resource_manager/client"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand All @@ -53,6 +55,10 @@ var mc storeCache
func init() {
mc.cache = make(map[string]*tikvStore)
rand.Seed(time.Now().UnixNano())

// Setup the Hooks to dynamic control global resource controller.
variable.EnableGlobalResourceControlFunc = tikv.EnableResourceControl
variable.DisableGlobalResourceControlFunc = tikv.DisableResourceControl
}

// Option is a function that changes some config of Driver
Expand Down Expand Up @@ -86,6 +92,25 @@ func WithPDClientConfig(client config.PDClient) Option {
}
}

// TrySetupGlobalResourceController tries to setup global resource controller.
func TrySetupGlobalResourceController(ctx context.Context, serverID uint64, s kv.Storage) error {
var (
store *tikvStore
ok bool
)
if store, ok = s.(*tikvStore); !ok {
return errors.New("cannot setup up resource controller, should use tikv storage")
}

control, err := rmclient.NewResourceGroupController(serverID, store.GetPDClient(), rmclient.DefaultRequestUnitConfig())
if err != nil {
return err
}
tikv.SetResourceControlInterceptor(control)
control.Start(ctx)
return nil
}

// TiKVDriver implements engine TiKV.
type TiKVDriver struct {
keyspaceName string
Expand Down
2 changes: 1 addition & 1 deletion testkit/mockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Doma
err := store.Close()
require.NoError(t, err)
view.Stop()
resourcemanager.GlobalResourceManager.Reset()
resourcemanager.InstanceResourceManager.Reset()
})
return dom
}
Expand Down
8 changes: 6 additions & 2 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,13 @@ func main() {

keyspaceName := config.GetGlobalKeyspaceName()

resourcemanager.GlobalResourceManager.Start()
resourcemanager.InstanceResourceManager.Start()
storage, dom := createStoreAndDomain(keyspaceName)
svr := createServer(storage, dom)
err = driver.TrySetupGlobalResourceController(context.Background(), dom.ServerID(), storage)
if err != nil {
logutil.BgLogger().Warn("failed to setup global resource controller", zap.Error(err))
}

// Register error API is not thread-safe, the caller MUST NOT register errors after initialization.
// To prevent misuse, set a flag to indicate that register new error will panic immediately.
Expand All @@ -233,7 +237,7 @@ func main() {
svr.Close()
cleanup(svr, storage, dom, graceful)
cpuprofile.StopCPUProfiler()
resourcemanager.GlobalResourceManager.Stop()
resourcemanager.InstanceResourceManager.Stop()
close(exited)
})
topsql.SetupTopSQL()
Expand Down
4 changes: 2 additions & 2 deletions util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri
result.capacity.Add(size)
result.workers = newWorkerLoopQueue[T, U, C, CT, TF](int(size))
result.cond = sync.NewCond(result.lock)
err := resourcemanager.GlobalResourceManager.Register(result, name, component)
err := resourcemanager.InstanceResourceManager.Register(result, name, component)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func (p *Pool[T, U, C, CT, TF]) ReleaseAndWait() {

close(p.stopCh)
p.release()
defer resourcemanager.GlobalResourceManager.Unregister(p.Name())
defer resourcemanager.InstanceResourceManager.Unregister(p.Name())
for {
// Wait for all workers to exit and all task to be completed.
if p.Running() == 0 && p.heartbeatDone.Load() && p.waitingTask.Load() == 0 {
Expand Down

0 comments on commit 5b82653

Please sign in to comment.