Skip to content

Commit

Permalink
*: integrate plugin framework with TiDB (#9006) (#9888)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and jackysp committed Mar 25, 2019
1 parent e549680 commit b2492a5
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH)))
export PATH := $(path_to_add):$(PATH)

GO := GO111MODULE=on go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOBUILD := CGO_ENABLED=1 $(GO) build $(BUILD_FLAG)
GOTEST := CGO_ENABLED=1 $(GO) test -p 3
OVERALLS := CGO_ENABLED=1 overalls
GOVERALLS := goveralls
Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type Config struct {
Binlog Binlog `toml:"binlog" json:"binlog"`
CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"`
CheckMb4ValueInUtf8 bool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"`
Plugin Plugin `toml:"plugin" json:"plugin"`
}

// Log is the log section of config.
Expand Down Expand Up @@ -240,6 +241,12 @@ type TiKVClient struct {
CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"`
}

// Plugin is the config for plugin
type Plugin struct {
Dir string `toml:"dir" json:"dir"`
Load string `toml:"load" json:"load"`
}

// Binlog is the config for binlog.
type Binlog struct {
Enable bool `toml:"enable" json:"enable"`
Expand Down
7 changes: 7 additions & 0 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -807,6 +808,12 @@ func (e *ShowExec) fetchShowProcedureStatus() error {
}

func (e *ShowExec) fetchShowPlugins() error {
tiPlugins := plugin.GetAll()
for _, ps := range tiPlugins {
for _, p := range ps {
e.appendRow([]interface{}{p.Name, p.State.String(), p.Kind.String(), p.Path, p.License, strconv.Itoa(int(p.Version))})
}
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1631,9 +1631,9 @@ func buildShowSchema(s *ast.ShowStmt) (schema *expression.Schema) {
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeLonglong,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar}
case ast.ShowPlugins:
names = []string{"Name", "Status", "Type", "Library", "License"}
names = []string{"Name", "Status", "Type", "Library", "License", "Version"}
ftypes = []byte{
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar,
}
case ast.ShowProcessList:
names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info", "Mem"}
Expand Down
80 changes: 57 additions & 23 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/privilege/privileges"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -1168,6 +1169,21 @@ func loadSystemTZ(se *session) (string, error) {

// BootstrapSession runs the first time when the TiDB server start.
func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
cfg := config.GetGlobalConfig()
if len(cfg.Plugin.Load) > 0 {
err := plugin.Init(context.Background(), plugin.Config{
Plugins: strings.Split(cfg.Plugin.Load, ","),
PluginDir: cfg.Plugin.Dir,
GlobalSysVar: &variable.SysVars,
PluginVarNames: &variable.PluginVarNames,
})
if err != nil {
return nil, err
}
}

initLoadCommonGlobalVarsSQL()

ver := getStoreBootstrapVersion(store)
if ver == notBootstrapped {
runInBootstrapSession(store, bootstrap)
Expand Down Expand Up @@ -1338,33 +1354,51 @@ func finishBootstrap(store kv.Storage) {
}

const quoteCommaQuote = "', '"
const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" +
variable.AutocommitVar + quoteCommaQuote +
variable.SQLModeVar + quoteCommaQuote +
variable.MaxAllowedPacket + quoteCommaQuote +
variable.TimeZone + quoteCommaQuote +
variable.BlockEncryptionMode + quoteCommaQuote +

var builtinGlobalVariable = []string{
variable.AutocommitVar,
variable.SQLModeVar,
variable.MaxAllowedPacket,
variable.TimeZone,
variable.BlockEncryptionMode,
/* TiDB specific global variables: */
variable.TiDBSkipUTF8Check + quoteCommaQuote +
variable.TiDBIndexJoinBatchSize + quoteCommaQuote +
variable.TiDBIndexLookupSize + quoteCommaQuote +
variable.TiDBIndexLookupConcurrency + quoteCommaQuote +
variable.TiDBIndexLookupJoinConcurrency + quoteCommaQuote +
variable.TiDBIndexSerialScanConcurrency + quoteCommaQuote +
variable.TiDBHashJoinConcurrency + quoteCommaQuote +
variable.TiDBProjectionConcurrency + quoteCommaQuote +
variable.TiDBHashAggPartialConcurrency + quoteCommaQuote +
variable.TiDBHashAggFinalConcurrency + quoteCommaQuote +
variable.TiDBBackoffLockFast + quoteCommaQuote +
variable.TiDBConstraintCheckInPlace + quoteCommaQuote +
variable.TiDBOptInSubqUnFolding + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + quoteCommaQuote +
variable.TiDBMaxChunkSize + quoteCommaQuote +
variable.TiDBRetryLimit + quoteCommaQuote +
variable.TiDBDisableTxnAutoRetry + "')"
variable.TiDBSkipUTF8Check,
variable.TiDBIndexJoinBatchSize,
variable.TiDBIndexLookupSize,
variable.TiDBIndexLookupConcurrency,
variable.TiDBIndexLookupJoinConcurrency,
variable.TiDBIndexSerialScanConcurrency,
variable.TiDBHashJoinConcurrency,
variable.TiDBProjectionConcurrency,
variable.TiDBHashAggPartialConcurrency,
variable.TiDBHashAggFinalConcurrency,
variable.TiDBBackoffLockFast,
variable.TiDBConstraintCheckInPlace,
variable.TiDBOptInSubqUnFolding,
variable.TiDBDistSQLScanConcurrency,
variable.TiDBMaxChunkSize,
variable.TiDBRetryLimit,
variable.TiDBDisableTxnAutoRetry,
}

var (
loadCommonGlobalVarsSQLOnce sync.Once
loadCommonGlobalVarsSQL string
)

func initLoadCommonGlobalVarsSQL() {
loadCommonGlobalVarsSQLOnce.Do(func() {
vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...)
if len(variable.PluginVarNames) > 0 {
vars = append(vars, variable.PluginVarNames...)
}
loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(vars, quoteCommaQuote) + "')"
})
}

// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
func (s *session) loadCommonGlobalVariablesIfNeeded() error {
initLoadCommonGlobalVarsSQL()
vars := s.sessionVars
if vars.CommonGlobalLoaded {
return nil
Expand Down
10 changes: 9 additions & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func GetSysVar(name string) *SysVar {
return SysVars[name]
}

// PluginVarNames is global plugin var names set.
var PluginVarNames []string

// Variable error codes.
const (
CodeUnknownStatusVar terror.ErrCode = 1
Expand Down Expand Up @@ -312,7 +315,8 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, "sort_buffer_size", "262144"},
{ScopeGlobal, "innodb_flush_neighbors", "1"},
{ScopeNone, "innodb_use_sys_malloc", "ON"},
{ScopeNone, "plugin_dir", "/usr/local/mysql/lib/plugin/"},
{ScopeSession, PluginLoad, ""},
{ScopeNone, PluginDir, "/data/deploy/plugin"},
{ScopeNone, "performance_schema_max_socket_classes", "10"},
{ScopeNone, "performance_schema_max_stage_classes", "150"},
{ScopeGlobal, "innodb_purge_batch_size", "300"},
Expand Down Expand Up @@ -777,6 +781,10 @@ const (
SyncBinlog = "sync_binlog"
// BlockEncryptionMode is the name for 'block_encryption_mode' system variable.
BlockEncryptionMode = "block_encryption_mode"
// PluginDir is the name of 'plugin_dir' system variable.
PluginDir = "plugin_dir"
// PluginLoad is the name of 'plugin_load' system variable.
PluginLoad = "plugin_load"
)

// GlobalVarAccessor is the interface for accessing global scope system and status variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func GetSessionOnlySysVars(s *SessionVars, key string) (string, bool, error) {
return strconv.FormatUint(atomic.LoadUint64(&config.GetGlobalConfig().Log.SlowThreshold), 10), true, nil
case TiDBQueryLogMaxLen:
return strconv.FormatUint(atomic.LoadUint64(&config.GetGlobalConfig().Log.QueryLogMaxLen), 10), true, nil
case PluginDir:
return config.GetGlobalConfig().Plugin.Dir, true, nil
case PluginLoad:
return config.GetGlobalConfig().Plugin.Load, true, nil
case TiDBCheckMb4ValueInUtf8:
return BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUtf8), true, nil
}
Expand Down
10 changes: 10 additions & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ const (
nmMetricsInterval = "metrics-interval"
nmDdlLease = "lease"
nmTokenLimit = "token-limit"
nmPluginDir = "plugin-dir"
nmPluginLoad = "plugin-load"

nmProxyProtocolNetworks = "proxy-protocol-networks"
nmProxyProtocolHeaderTimeout = "proxy-protocol-header-timeout"
Expand All @@ -94,6 +96,8 @@ var (
runDDL = flagBoolean(nmRunDDL, true, "run ddl worker on this tidb-server")
ddlLease = flag.String(nmDdlLease, "45s", "schema lease duration, very dangerous to change only if you know what you do")
tokenLimit = flag.Int(nmTokenLimit, 1000, "the limit of concurrent executed sessions")
pluginDir = flag.String(nmPluginDir, "/data/deploy/plugin", "the folder that hold plugin")
pluginLoad = flag.String(nmPluginLoad, "", "wait load plugin name(seperated by comma)")

// Log
logLevel = flag.String(nmLogLevel, "info", "log level: info, debug, warn, error, fatal")
Expand Down Expand Up @@ -315,6 +319,12 @@ func overrideConfig() {
if actualFlags[nmTokenLimit] {
cfg.TokenLimit = uint(*tokenLimit)
}
if actualFlags[nmPluginLoad] {
cfg.Plugin.Load = *pluginLoad
}
if actualFlags[nmPluginDir] {
cfg.Plugin.Dir = *pluginDir
}

// Log
if actualFlags[nmLogLevel] {
Expand Down

0 comments on commit b2492a5

Please sign in to comment.