From 4b98ad6f29748705ad1e7bf34eaf9634fdef327b Mon Sep 17 00:00:00 2001 From: lysu Date: Mon, 14 Jan 2019 16:53:41 +0800 Subject: [PATCH] *: integrate plugin framework with TiDB (#9006) --- Makefile | 2 +- config/config.go | 7 +++ domain/domain.go | 7 ++- domain/topn_slow_query.go | 4 ++ executor/show.go | 7 +++ infoschema/infoschema.go | 10 +++- infoschema/perfschema/tables.go | 13 +++++ planner/core/planbuilder.go | 4 +- session/session.go | 95 +++++++++++++++++++++++---------- sessionctx/variable/sysvar.go | 10 +++- sessionctx/variable/varsutil.go | 4 ++ tidb-server/main.go | 10 ++++ 12 files changed, 137 insertions(+), 36 deletions(-) diff --git a/Makefile b/Makefile index be1c07d0cccdd..1658f1b02d847 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH))) export PATH := $(path_to_add):$(PATH) GO := GO111MODULE=on go -GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) +GOBUILD := CGO_ENABLED=1 $(GO) build $(BUILD_FLAG) GOTEST := CGO_ENABLED=1 $(GO) test -p 3 OVERALLS := CGO_ENABLED=1 GO111MODULE=on overalls diff --git a/config/config.go b/config/config.go index d62165f4e0a08..3f29d75bd9301 100644 --- a/config/config.go +++ b/config/config.go @@ -74,6 +74,7 @@ type Config struct { TiKVClient TiKVClient `toml:"tikv-client" json:"tikv-client"` Binlog Binlog `toml:"binlog" json:"binlog"` CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"` + Plugin Plugin `toml:"plugin" json:"plugin"` } // Log is the log section of config. @@ -265,6 +266,12 @@ type Binlog struct { BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"` } +// Plugin is the config for plugin +type Plugin struct { + Dir string `toml:"dir" json:"dir"` + Load string `toml:"load" json:"load"` +} + var defaultConf = Config{ Host: "0.0.0.0", AdvertiseAddress: "", diff --git a/domain/domain.go b/domain/domain.go index c2d91bfe132cd..367721fa0f1ef 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -378,10 +378,13 @@ func (do *Domain) topNSlowQueryLoop() { do.slowQuery.Append(info) case msg := <-do.slowQuery.msgCh: req := msg.request - if req.Tp == ast.ShowSlowTop { + switch req.Tp { + case ast.ShowSlowTop: msg.result = do.slowQuery.QueryTop(int(req.Count), req.Kind) - } else if req.Tp == ast.ShowSlowRecent { + case ast.ShowSlowRecent: msg.result = do.slowQuery.QueryRecent(int(req.Count)) + default: + msg.result = do.slowQuery.QueryAll() } msg.Done() } diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go index 0bf6721454d6c..4689fafad0a0e 100644 --- a/domain/topn_slow_query.go +++ b/domain/topn_slow_query.go @@ -160,6 +160,10 @@ func (q *topNSlowQueries) Append(info *SlowQueryInfo) { } } +func (q *topNSlowQueries) QueryAll() []*SlowQueryInfo { + return q.recent.data +} + func (q *topNSlowQueries) RemoveExpired(now time.Time) { q.user.RemoveExpired(now, q.period) q.internal.RemoveExpired(now, q.period) diff --git a/executor/show.go b/executor/show.go index 1623ea14e98e1..e0c0f829149d1 100644 --- a/executor/show.go +++ b/executor/show.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/infoschema" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -875,6 +876,12 @@ func (e *ShowExec) fetchShowProcedureStatus() error { } func (e *ShowExec) fetchShowPlugins() error { + tiPlugins := plugin.GetAll() + for _, ps := range tiPlugins { + for _, p := range ps { + e.appendRow([]interface{}{p.Name, p.State.String(), p.Kind.String(), p.Path, p.License, strconv.Itoa(int(p.Version))}) + } + } return nil } diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index d8bdf96b10682..44ab45ff7abfa 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -358,5 +358,13 @@ func initInfoSchemaDB() { // IsMemoryDB checks if the db is in memory. func IsMemoryDB(dbName string) bool { - return dbName == "information_schema" || dbName == "performance_schema" + if dbName == "information_schema" { + return true + } + for _, driver := range drivers { + if driver.DBInfo.Name.L == dbName { + return true + } + } + return false } diff --git a/infoschema/perfschema/tables.go b/infoschema/perfschema/tables.go index e0712aad7514f..dbe3e68155659 100644 --- a/infoschema/perfschema/tables.go +++ b/infoschema/perfschema/tables.go @@ -27,7 +27,20 @@ type perfSchemaTable struct { cols []*table.Column } +var pluginTable = make(map[string]func(autoid.Allocator, *model.TableInfo) (table.Table, error)) + +// RegisterTable registers a new table into TiDB. +func RegisterTable(tableName, sql string, + tableFromMeta func(autoid.Allocator, *model.TableInfo) (table.Table, error)) { + perfSchemaTables = append(perfSchemaTables, sql) + pluginTable[tableName] = tableFromMeta +} + func tableFromMeta(alloc autoid.Allocator, meta *model.TableInfo) (table.Table, error) { + if f, ok := pluginTable[meta.Name.L]; ok { + ret, err := f(alloc, meta) + return ret, err + } return createPerfSchemaTable(meta), nil } diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 454b3b977312b..cea8103c07fb1 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1722,9 +1722,9 @@ func buildShowSchema(s *ast.ShowStmt) (schema *expression.Schema) { mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar} case ast.ShowPlugins: - names = []string{"Name", "Status", "Type", "Library", "License"} + names = []string{"Name", "Status", "Type", "Library", "License", "Version"} ftypes = []byte{ - mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, + mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, } case ast.ShowProcessList: names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"} diff --git a/session/session.go b/session/session.go index fb304d5684fd9..419777ae197da 100644 --- a/session/session.go +++ b/session/session.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" @@ -46,6 +47,7 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/owner" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/sessionctx" @@ -1276,6 +1278,21 @@ func loadSystemTZ(se *session) (string, error) { // BootstrapSession runs the first time when the TiDB server start. func BootstrapSession(store kv.Storage) (*domain.Domain, error) { + cfg := config.GetGlobalConfig() + if len(cfg.Plugin.Load) > 0 { + err := plugin.Init(context.Background(), plugin.Config{ + Plugins: strings.Split(cfg.Plugin.Load, ","), + PluginDir: cfg.Plugin.Dir, + GlobalSysVar: &variable.SysVars, + PluginVarNames: &variable.PluginVarNames, + }) + if err != nil { + return nil, err + } + } + + initLoadCommonGlobalVarsSQL() + ver := getStoreBootstrapVersion(store) if ver == notBootstrapped { runInBootstrapSession(store, bootstrap) @@ -1446,39 +1463,59 @@ func finishBootstrap(store kv.Storage) { } const quoteCommaQuote = "', '" -const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + - variable.AutocommitVar + quoteCommaQuote + - variable.SQLModeVar + quoteCommaQuote + - variable.MaxAllowedPacket + quoteCommaQuote + - variable.TimeZone + quoteCommaQuote + - variable.BlockEncryptionMode + quoteCommaQuote + - variable.WaitTimeout + quoteCommaQuote + - variable.InteractiveTimeout + quoteCommaQuote + - variable.MaxPreparedStmtCount + quoteCommaQuote + + +var builtinGlobalVariable = []string{ + variable.AutocommitVar, + variable.SQLModeVar, + variable.MaxAllowedPacket, + variable.TimeZone, + variable.BlockEncryptionMode, + variable.WaitTimeout, + variable.InteractiveTimeout, + variable.MaxPreparedStmtCount, /* TiDB specific global variables: */ - variable.TiDBSkipUTF8Check + quoteCommaQuote + - variable.TiDBIndexJoinBatchSize + quoteCommaQuote + - variable.TiDBIndexLookupSize + quoteCommaQuote + - variable.TiDBIndexLookupConcurrency + quoteCommaQuote + - variable.TiDBIndexLookupJoinConcurrency + quoteCommaQuote + - variable.TiDBIndexSerialScanConcurrency + quoteCommaQuote + - variable.TiDBHashJoinConcurrency + quoteCommaQuote + - variable.TiDBProjectionConcurrency + quoteCommaQuote + - variable.TiDBHashAggPartialConcurrency + quoteCommaQuote + - variable.TiDBHashAggFinalConcurrency + quoteCommaQuote + - variable.TiDBBackoffLockFast + quoteCommaQuote + - variable.TiDBConstraintCheckInPlace + quoteCommaQuote + - variable.TiDBOptInSubqToJoinAndAgg + quoteCommaQuote + - variable.TiDBDistSQLScanConcurrency + quoteCommaQuote + - variable.TiDBInitChunkSize + quoteCommaQuote + - variable.TiDBMaxChunkSize + quoteCommaQuote + - variable.TiDBEnableCascadesPlanner + quoteCommaQuote + - variable.TiDBRetryLimit + quoteCommaQuote + - variable.TiDBDisableTxnAutoRetry + quoteCommaQuote + - variable.TiDBEnableWindowFunction + "')" + variable.TiDBSkipUTF8Check, + variable.TiDBIndexJoinBatchSize, + variable.TiDBIndexLookupSize, + variable.TiDBIndexLookupConcurrency, + variable.TiDBIndexLookupJoinConcurrency, + variable.TiDBIndexSerialScanConcurrency, + variable.TiDBHashJoinConcurrency, + variable.TiDBProjectionConcurrency, + variable.TiDBHashAggPartialConcurrency, + variable.TiDBHashAggFinalConcurrency, + variable.TiDBBackoffLockFast, + variable.TiDBConstraintCheckInPlace, + variable.TiDBDDLReorgWorkerCount, + variable.TiDBDDLReorgBatchSize, + variable.TiDBOptInSubqToJoinAndAgg, + variable.TiDBDistSQLScanConcurrency, + variable.TiDBInitChunkSize, + variable.TiDBMaxChunkSize, + variable.TiDBEnableCascadesPlanner, + variable.TiDBRetryLimit, + variable.TiDBDisableTxnAutoRetry, + variable.TiDBEnableWindowFunction, +} + +var ( + loadCommonGlobalVarsSQLOnce sync.Once + loadCommonGlobalVarsSQL string +) + +func initLoadCommonGlobalVarsSQL() { + loadCommonGlobalVarsSQLOnce.Do(func() { + vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...) + if len(variable.PluginVarNames) > 0 { + vars = append(vars, variable.PluginVarNames...) + } + loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(vars, quoteCommaQuote) + "')" + }) +} // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. func (s *session) loadCommonGlobalVariablesIfNeeded() error { + initLoadCommonGlobalVarsSQL() vars := s.sessionVars if vars.CommonGlobalLoaded { return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 1814985a7b084..9befa39267a1c 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -57,6 +57,9 @@ func GetSysVar(name string) *SysVar { return SysVars[name] } +// PluginVarNames is global plugin var names set. +var PluginVarNames []string + // Variable error codes. const ( CodeUnknownStatusVar terror.ErrCode = 1 @@ -315,7 +318,8 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, "sort_buffer_size", "262144"}, {ScopeGlobal, "innodb_flush_neighbors", "1"}, {ScopeNone, "innodb_use_sys_malloc", "ON"}, - {ScopeNone, "plugin_dir", "/usr/local/mysql/lib/plugin/"}, + {ScopeSession, PluginLoad, ""}, + {ScopeSession, PluginDir, "/data/deploy/plugin"}, {ScopeNone, "performance_schema_max_socket_classes", "10"}, {ScopeNone, "performance_schema_max_stage_classes", "150"}, {ScopeGlobal, "innodb_purge_batch_size", "300"}, @@ -789,6 +793,10 @@ const ( ValidatePasswordNumberCount = "validate_password_number_count" // ValidatePasswordLength is the name of 'validate_password_length' system variable. ValidatePasswordLength = "validate_password_length" + // PluginDir is the name of 'plugin_dir' system variable. + PluginDir = "plugin_dir" + // PluginLoad is the name of 'plugin_load' system variable. + PluginLoad = "plugin_load" ) // GlobalVarAccessor is the interface for accessing global scope system and status variables. diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 9c20d2e4d261a..89d498c5e1128 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -106,6 +106,10 @@ func GetSessionOnlySysVars(s *SessionVars, key string) (string, bool, error) { return strconv.FormatUint(atomic.LoadUint64(&config.GetGlobalConfig().Log.SlowThreshold), 10), true, nil case TiDBQueryLogMaxLen: return strconv.FormatUint(atomic.LoadUint64(&config.GetGlobalConfig().Log.QueryLogMaxLen), 10), true, nil + case PluginDir: + return config.GetGlobalConfig().Plugin.Dir, true, nil + case PluginLoad: + return config.GetGlobalConfig().Plugin.Load, true, nil } sVal, ok := s.systems[key] if ok { diff --git a/tidb-server/main.go b/tidb-server/main.go index 8a6cb9a704205..c074779832baa 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -80,6 +80,8 @@ const ( nmMetricsInterval = "metrics-interval" nmDdlLease = "lease" nmTokenLimit = "token-limit" + nmPluginDir = "plugin-dir" + nmPluginLoad = "plugin-load" nmProxyProtocolNetworks = "proxy-protocol-networks" nmProxyProtocolHeaderTimeout = "proxy-protocol-header-timeout" @@ -101,6 +103,8 @@ var ( runDDL = flagBoolean(nmRunDDL, true, "run ddl worker on this tidb-server") ddlLease = flag.String(nmDdlLease, "45s", "schema lease duration, very dangerous to change only if you know what you do") tokenLimit = flag.Int(nmTokenLimit, 1000, "the limit of concurrent executed sessions") + pluginDir = flag.String(nmPluginDir, "/data/deploy/plugin", "the folder that hold plugin") + pluginLoad = flag.String(nmPluginLoad, "", "wait load plugin name(seperated by comma)") // Log logLevel = flag.String(nmLogLevel, "info", "log level: info, debug, warn, error, fatal") @@ -323,6 +327,12 @@ func overrideConfig() { if actualFlags[nmTokenLimit] { cfg.TokenLimit = uint(*tokenLimit) } + if actualFlags[nmPluginLoad] { + cfg.Plugin.Load = *pluginLoad + } + if actualFlags[nmPluginDir] { + cfg.Plugin.Dir = *pluginDir + } // Log if actualFlags[nmLogLevel] {