Skip to content

Commit

Permalink
*: integrate plugin framework with TiDB (#9006)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and jackysp committed Jan 14, 2019
1 parent 9f346a3 commit 4b98ad6
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH)))
export PATH := $(path_to_add):$(PATH)

GO := GO111MODULE=on go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOBUILD := CGO_ENABLED=1 $(GO) build $(BUILD_FLAG)
GOTEST := CGO_ENABLED=1 $(GO) test -p 3
OVERALLS := CGO_ENABLED=1 GO111MODULE=on overalls

Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type Config struct {
TiKVClient TiKVClient `toml:"tikv-client" json:"tikv-client"`
Binlog Binlog `toml:"binlog" json:"binlog"`
CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"`
Plugin Plugin `toml:"plugin" json:"plugin"`
}

// Log is the log section of config.
Expand Down Expand Up @@ -265,6 +266,12 @@ type Binlog struct {
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
}

// Plugin is the config for plugin
type Plugin struct {
Dir string `toml:"dir" json:"dir"`
Load string `toml:"load" json:"load"`
}

var defaultConf = Config{
Host: "0.0.0.0",
AdvertiseAddress: "",
Expand Down
7 changes: 5 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,13 @@ func (do *Domain) topNSlowQueryLoop() {
do.slowQuery.Append(info)
case msg := <-do.slowQuery.msgCh:
req := msg.request
if req.Tp == ast.ShowSlowTop {
switch req.Tp {
case ast.ShowSlowTop:
msg.result = do.slowQuery.QueryTop(int(req.Count), req.Kind)
} else if req.Tp == ast.ShowSlowRecent {
case ast.ShowSlowRecent:
msg.result = do.slowQuery.QueryRecent(int(req.Count))
default:
msg.result = do.slowQuery.QueryAll()
}
msg.Done()
}
Expand Down
4 changes: 4 additions & 0 deletions domain/topn_slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func (q *topNSlowQueries) Append(info *SlowQueryInfo) {
}
}

func (q *topNSlowQueries) QueryAll() []*SlowQueryInfo {
return q.recent.data
}

func (q *topNSlowQueries) RemoveExpired(now time.Time) {
q.user.RemoveExpired(now, q.period)
q.internal.RemoveExpired(now, q.period)
Expand Down
7 changes: 7 additions & 0 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -875,6 +876,12 @@ func (e *ShowExec) fetchShowProcedureStatus() error {
}

func (e *ShowExec) fetchShowPlugins() error {
tiPlugins := plugin.GetAll()
for _, ps := range tiPlugins {
for _, p := range ps {
e.appendRow([]interface{}{p.Name, p.State.String(), p.Kind.String(), p.Path, p.License, strconv.Itoa(int(p.Version))})
}
}
return nil
}

Expand Down
10 changes: 9 additions & 1 deletion infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,5 +358,13 @@ func initInfoSchemaDB() {

// IsMemoryDB checks if the db is in memory.
func IsMemoryDB(dbName string) bool {
return dbName == "information_schema" || dbName == "performance_schema"
if dbName == "information_schema" {
return true
}
for _, driver := range drivers {
if driver.DBInfo.Name.L == dbName {
return true
}
}
return false
}
13 changes: 13 additions & 0 deletions infoschema/perfschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,20 @@ type perfSchemaTable struct {
cols []*table.Column
}

var pluginTable = make(map[string]func(autoid.Allocator, *model.TableInfo) (table.Table, error))

// RegisterTable registers a new table into TiDB.
func RegisterTable(tableName, sql string,
tableFromMeta func(autoid.Allocator, *model.TableInfo) (table.Table, error)) {
perfSchemaTables = append(perfSchemaTables, sql)
pluginTable[tableName] = tableFromMeta
}

func tableFromMeta(alloc autoid.Allocator, meta *model.TableInfo) (table.Table, error) {
if f, ok := pluginTable[meta.Name.L]; ok {
ret, err := f(alloc, meta)
return ret, err
}
return createPerfSchemaTable(meta), nil
}

Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1722,9 +1722,9 @@ func buildShowSchema(s *ast.ShowStmt) (schema *expression.Schema) {
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeLonglong,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar}
case ast.ShowPlugins:
names = []string{"Name", "Status", "Type", "Library", "License"}
names = []string{"Name", "Status", "Type", "Library", "License", "Version"}
ftypes = []byte{
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar,
}
case ast.ShowProcessList:
names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}
Expand Down
95 changes: 66 additions & 29 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/privilege/privileges"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -1276,6 +1278,21 @@ func loadSystemTZ(se *session) (string, error) {

// BootstrapSession runs the first time when the TiDB server start.
func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
cfg := config.GetGlobalConfig()
if len(cfg.Plugin.Load) > 0 {
err := plugin.Init(context.Background(), plugin.Config{
Plugins: strings.Split(cfg.Plugin.Load, ","),
PluginDir: cfg.Plugin.Dir,
GlobalSysVar: &variable.SysVars,
PluginVarNames: &variable.PluginVarNames,
})
if err != nil {
return nil, err
}
}

initLoadCommonGlobalVarsSQL()

ver := getStoreBootstrapVersion(store)
if ver == notBootstrapped {
runInBootstrapSession(store, bootstrap)
Expand Down Expand Up @@ -1446,39 +1463,59 @@ func finishBootstrap(store kv.Storage) {
}

const quoteCommaQuote = "', '"
const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" +
variable.AutocommitVar + quoteCommaQuote +
variable.SQLModeVar + quoteCommaQuote +
variable.MaxAllowedPacket + quoteCommaQuote +
variable.TimeZone + quoteCommaQuote +
variable.BlockEncryptionMode + quoteCommaQuote +
variable.WaitTimeout + quoteCommaQuote +
variable.InteractiveTimeout + quoteCommaQuote +
variable.MaxPreparedStmtCount + quoteCommaQuote +

var builtinGlobalVariable = []string{
variable.AutocommitVar,
variable.SQLModeVar,
variable.MaxAllowedPacket,
variable.TimeZone,
variable.BlockEncryptionMode,
variable.WaitTimeout,
variable.InteractiveTimeout,
variable.MaxPreparedStmtCount,
/* TiDB specific global variables: */
variable.TiDBSkipUTF8Check + quoteCommaQuote +
variable.TiDBIndexJoinBatchSize + quoteCommaQuote +
variable.TiDBIndexLookupSize + quoteCommaQuote +
variable.TiDBIndexLookupConcurrency + quoteCommaQuote +
variable.TiDBIndexLookupJoinConcurrency + quoteCommaQuote +
variable.TiDBIndexSerialScanConcurrency + quoteCommaQuote +
variable.TiDBHashJoinConcurrency + quoteCommaQuote +
variable.TiDBProjectionConcurrency + quoteCommaQuote +
variable.TiDBHashAggPartialConcurrency + quoteCommaQuote +
variable.TiDBHashAggFinalConcurrency + quoteCommaQuote +
variable.TiDBBackoffLockFast + quoteCommaQuote +
variable.TiDBConstraintCheckInPlace + quoteCommaQuote +
variable.TiDBOptInSubqToJoinAndAgg + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + quoteCommaQuote +
variable.TiDBInitChunkSize + quoteCommaQuote +
variable.TiDBMaxChunkSize + quoteCommaQuote +
variable.TiDBEnableCascadesPlanner + quoteCommaQuote +
variable.TiDBRetryLimit + quoteCommaQuote +
variable.TiDBDisableTxnAutoRetry + quoteCommaQuote +
variable.TiDBEnableWindowFunction + "')"
variable.TiDBSkipUTF8Check,
variable.TiDBIndexJoinBatchSize,
variable.TiDBIndexLookupSize,
variable.TiDBIndexLookupConcurrency,
variable.TiDBIndexLookupJoinConcurrency,
variable.TiDBIndexSerialScanConcurrency,
variable.TiDBHashJoinConcurrency,
variable.TiDBProjectionConcurrency,
variable.TiDBHashAggPartialConcurrency,
variable.TiDBHashAggFinalConcurrency,
variable.TiDBBackoffLockFast,
variable.TiDBConstraintCheckInPlace,
variable.TiDBDDLReorgWorkerCount,
variable.TiDBDDLReorgBatchSize,
variable.TiDBOptInSubqToJoinAndAgg,
variable.TiDBDistSQLScanConcurrency,
variable.TiDBInitChunkSize,
variable.TiDBMaxChunkSize,
variable.TiDBEnableCascadesPlanner,
variable.TiDBRetryLimit,
variable.TiDBDisableTxnAutoRetry,
variable.TiDBEnableWindowFunction,
}

var (
loadCommonGlobalVarsSQLOnce sync.Once
loadCommonGlobalVarsSQL string
)

func initLoadCommonGlobalVarsSQL() {
loadCommonGlobalVarsSQLOnce.Do(func() {
vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...)
if len(variable.PluginVarNames) > 0 {
vars = append(vars, variable.PluginVarNames...)
}
loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(vars, quoteCommaQuote) + "')"
})
}

// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
func (s *session) loadCommonGlobalVariablesIfNeeded() error {
initLoadCommonGlobalVarsSQL()
vars := s.sessionVars
if vars.CommonGlobalLoaded {
return nil
Expand Down
10 changes: 9 additions & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func GetSysVar(name string) *SysVar {
return SysVars[name]
}

// PluginVarNames is global plugin var names set.
var PluginVarNames []string

// Variable error codes.
const (
CodeUnknownStatusVar terror.ErrCode = 1
Expand Down Expand Up @@ -315,7 +318,8 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, "sort_buffer_size", "262144"},
{ScopeGlobal, "innodb_flush_neighbors", "1"},
{ScopeNone, "innodb_use_sys_malloc", "ON"},
{ScopeNone, "plugin_dir", "/usr/local/mysql/lib/plugin/"},
{ScopeSession, PluginLoad, ""},
{ScopeSession, PluginDir, "/data/deploy/plugin"},
{ScopeNone, "performance_schema_max_socket_classes", "10"},
{ScopeNone, "performance_schema_max_stage_classes", "150"},
{ScopeGlobal, "innodb_purge_batch_size", "300"},
Expand Down Expand Up @@ -789,6 +793,10 @@ const (
ValidatePasswordNumberCount = "validate_password_number_count"
// ValidatePasswordLength is the name of 'validate_password_length' system variable.
ValidatePasswordLength = "validate_password_length"
// PluginDir is the name of 'plugin_dir' system variable.
PluginDir = "plugin_dir"
// PluginLoad is the name of 'plugin_load' system variable.
PluginLoad = "plugin_load"
)

// GlobalVarAccessor is the interface for accessing global scope system and status variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func GetSessionOnlySysVars(s *SessionVars, key string) (string, bool, error) {
return strconv.FormatUint(atomic.LoadUint64(&config.GetGlobalConfig().Log.SlowThreshold), 10), true, nil
case TiDBQueryLogMaxLen:
return strconv.FormatUint(atomic.LoadUint64(&config.GetGlobalConfig().Log.QueryLogMaxLen), 10), true, nil
case PluginDir:
return config.GetGlobalConfig().Plugin.Dir, true, nil
case PluginLoad:
return config.GetGlobalConfig().Plugin.Load, true, nil
}
sVal, ok := s.systems[key]
if ok {
Expand Down
10 changes: 10 additions & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ const (
nmMetricsInterval = "metrics-interval"
nmDdlLease = "lease"
nmTokenLimit = "token-limit"
nmPluginDir = "plugin-dir"
nmPluginLoad = "plugin-load"

nmProxyProtocolNetworks = "proxy-protocol-networks"
nmProxyProtocolHeaderTimeout = "proxy-protocol-header-timeout"
Expand All @@ -101,6 +103,8 @@ var (
runDDL = flagBoolean(nmRunDDL, true, "run ddl worker on this tidb-server")
ddlLease = flag.String(nmDdlLease, "45s", "schema lease duration, very dangerous to change only if you know what you do")
tokenLimit = flag.Int(nmTokenLimit, 1000, "the limit of concurrent executed sessions")
pluginDir = flag.String(nmPluginDir, "/data/deploy/plugin", "the folder that hold plugin")
pluginLoad = flag.String(nmPluginLoad, "", "wait load plugin name(seperated by comma)")

// Log
logLevel = flag.String(nmLogLevel, "info", "log level: info, debug, warn, error, fatal")
Expand Down Expand Up @@ -323,6 +327,12 @@ func overrideConfig() {
if actualFlags[nmTokenLimit] {
cfg.TokenLimit = uint(*tokenLimit)
}
if actualFlags[nmPluginLoad] {
cfg.Plugin.Load = *pluginLoad
}
if actualFlags[nmPluginDir] {
cfg.Plugin.Dir = *pluginDir
}

// Log
if actualFlags[nmLogLevel] {
Expand Down

0 comments on commit 4b98ad6

Please sign in to comment.