Skip to content

Commit

Permalink
Merge branch 'master' into healthy-distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Dec 14, 2022
2 parents d9d6495 + d2eca72 commit cd52df4
Show file tree
Hide file tree
Showing 239 changed files with 20,171 additions and 11,886 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3527,8 +3527,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:Nr2EhvqkOE9xFyU7LV9c9EbsgN3OzVALdbfobK7Fmn4=",
version = "v2.0.3-0.20221205084317-ad59ca833a78",
sum = "h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=",
version = "v2.0.3",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
6 changes: 3 additions & 3 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

http_archive(
name = "io_bazel_rules_go",
sha256 = "ae013bf35bd23234d1dea46b079f1e05ba74ac0321423830119d3e787ec73483",
sha256 = "56d8c5a5c91e1af73eca71a6fab2ced959b67c86d12ba37feedb0a2dfea441a6",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.37.0/rules_go-v0.37.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.37.0/rules_go-v0.37.0.zip",
],
)

Expand Down
105 changes: 0 additions & 105 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,108 +1388,3 @@ func TestDropBindBySQLDigest(t *testing.T) {
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", "1"), "can't find any binding for '1'")
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", ""), "sql digest is empty")
}

func TestCreateBindingFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1(id int primary key, a int, b int, key(a))")
tk.MustExec("create table t2(id int primary key, a int, b int, key(a))")

var testCases = []struct {
sqls []string
hint string
}{
{
sqls: []string{
"select %s * from t1, t2 where t1.id = t2.id",
"select %s * from test.t1, t2 where t1.id = t2.id",
"select %s * from test.t1, test.t2 where t1.id = t2.id",
"select %s * from t1, test.t2 where t1.id = t2.id",
},
hint: "/*+ merge_join(t1, t2) */",
},
{
sqls: []string{
"select %s * from t1 where a = 1",
"select %s * from test.t1 where a = 1",
},
hint: "/*+ ignore_index(t, a) */",
},
}

for _, testCase := range testCases {
for _, bind := range testCase.sqls {
stmtsummary.StmtSummaryByDigestMap.Clear()
bindSQL := fmt.Sprintf(bind, testCase.hint)
tk.MustExec(bindSQL)
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", bindSQL)).Rows()
tk.MustExec(fmt.Sprintf("create session binding from history using plan digest '%s'", planDigest[0][0]))
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
require.Equal(t, planDigest[0][0], showRes[0][10])
for _, sql := range testCase.sqls {
tk.MustExec(fmt.Sprintf(sql, ""))
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))
}
}
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
tk.MustExec(fmt.Sprintf("drop binding for sql digest '%s'", showRes[0][9]))
}

// exception cases
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", "1"), "can't find any plans for '1'")
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", ""), "plan digest is empty")
tk.MustExec("create binding for select * from t1, t2 where t1.id = t2.id using select /*+ merge_join(t1, t2) */ * from t1, t2 where t1.id = t2.id")
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, showRes[0][10], "") // plan digest should be nil by create for
}

func TestCreateBindingForPrepareFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key, a int, key(a))")

tk.MustExec("prepare stmt from 'select /*+ ignore_index(t,a) */ * from t where a = ?'")
tk.MustExec("set @a = 1")
tk.MustExec("execute stmt using @a")
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", "select /*+ ignore_index(t,a) */ * from t where a = ? [arguments: 1]")).Rows()
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 0)
tk.MustExec(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]))
showRes = tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
require.Equal(t, planDigest[0][0], showRes[0][10])
tk.MustExec("execute stmt using @a")
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))
}

func TestErrorCasesCreateBindingFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2, t3")
tk.MustExec("create table t1(id int)")
tk.MustExec("create table t2(id int)")
tk.MustExec("create table t3(id int)")

sql := "select * from t1 where t1.id in (select id from t2)"
tk.MustExec(sql)
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with sub query")

sql = "select * from t1, t2, t3 where t1.id = t2.id and t2.id = t3.id"
tk.MustExec(sql)
planDigest = tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with more than two table join")
}
2 changes: 1 addition & 1 deletion br/COMPATIBILITY_TEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Background

We had some incompatibility issues in the past, which made BR cannot restore backed up data in some situations.
So we need a test workflow to check the compatiblity.
So we need a test workflow to check the compatibility.

## Goal

Expand Down
2 changes: 1 addition & 1 deletion br/cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func run() error {
if err != nil {
return err
}
if err = cfg.TiDB.Security.RegisterMySQL(); err != nil {
if err = cfg.TiDB.Security.BuildTLSConfig(); err != nil {
return err
}

Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ go_library(
"//br/pkg/version/build",
"//expression",
"//planner/core",
"//util",
"//util/promutil",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
Expand Down
14 changes: 13 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
return pebble.Open(dbPath, opts)
}

var (
// RunInTest indicates whether the current process is running in test.
RunInTest bool
// LastAlloc is the last ID allocator.
LastAlloc manual.Allocator
)

// NewLocalBackend creates new connections to tikv.
func NewLocalBackend(
ctx context.Context,
Expand Down Expand Up @@ -461,6 +468,11 @@ func NewLocalBackend(
} else {
writeLimiter = noopStoreWriteLimiter{}
}
alloc := manual.Allocator{}
if RunInTest {
alloc.RefCnt = new(atomic.Int64)
LastAlloc = alloc
}
local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
Expand All @@ -486,7 +498,7 @@ func NewLocalBackend(
keyAdapter: keyAdapter,
errorMgr: errorMgr,
importClientFactory: importClientFactory,
bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)),
writeLimiter: writeLimiter,
logger: log.FromContext(ctx),
encBuilder: NewEncodingBuilder(ctx),
Expand Down
26 changes: 17 additions & 9 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package common

import (
"context"
"crypto/tls"
"database/sql"
"encoding/base64"
"encoding/json"
Expand Down Expand Up @@ -47,14 +48,16 @@ const (

// MySQLConnectParam records the parameters needed to connect to a MySQL database.
type MySQLConnectParam struct {
Host string
Port int
User string
Password string
SQLMode string
MaxAllowedPacket uint64
TLS string
Vars map[string]string
Host string
Port int
User string
Password string
SQLMode string
MaxAllowedPacket uint64
TLSConfig *tls.Config
AllowFallbackToPlaintext bool
Net string
Vars map[string]string
}

func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config {
Expand All @@ -64,11 +67,16 @@ func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config {
cfg.User = param.User
cfg.Passwd = param.Password
cfg.Net = "tcp"
if param.Net != "" {
cfg.Net = param.Net
}
cfg.Addr = net.JoinHostPort(param.Host, strconv.Itoa(param.Port))
cfg.Params["charset"] = "utf8mb4"
cfg.Params["sql_mode"] = fmt.Sprintf("'%s'", param.SQLMode)
cfg.MaxAllowedPacket = int(param.MaxAllowedPacket)
cfg.TLSConfig = param.TLS

cfg.TLS = param.TLSConfig
cfg.AllowFallbackToPlaintext = param.AllowFallbackToPlaintext

for k, v := range param.Vars {
cfg.Params[k] = fmt.Sprintf("'%s'", v)
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ go_library(
"@com_github_carlmjohnson_flagext//:flagext",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down
73 changes: 36 additions & 37 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"math"
Expand All @@ -32,7 +33,6 @@ import (
"github.com/BurntSushi/toml"
"github.com/docker/go-units"
gomysql "github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
Expand Down Expand Up @@ -135,6 +135,9 @@ type DBStore struct {
IndexSerialScanConcurrency int `toml:"index-serial-scan-concurrency" json:"index-serial-scan-concurrency"`
ChecksumTableConcurrency int `toml:"checksum-table-concurrency" json:"checksum-table-concurrency"`
Vars map[string]string `toml:"-" json:"vars"`

IOTotalBytes *atomic.Uint64 `toml:"-" json:"-"`
UUID string `toml:"-" json:"-"`
}

type Config struct {
Expand Down Expand Up @@ -575,20 +578,18 @@ type Security struct {
// RedactInfoLog indicates that whether enabling redact log
RedactInfoLog bool `toml:"redact-info-log" json:"redact-info-log"`

// TLSConfigName is used to set tls config for lightning in DM, so we don't expose this field to user
// DM may running many lightning instances at same time, so we need to set different tls config name for each lightning
TLSConfigName string `toml:"-" json:"-"`
TLSConfig *tls.Config `toml:"-" json:"-"`
AllowFallbackToPlaintext bool `toml:"-" json:"-"`

// When DM/engine uses lightning as a library, it can directly pass in the content
CABytes []byte `toml:"-" json:"-"`
CertBytes []byte `toml:"-" json:"-"`
KeyBytes []byte `toml:"-" json:"-"`
}

// RegisterMySQL registers the TLS config with name "cluster" or security.TLSConfigName
// for use in `sql.Open()`. This method is goroutine-safe.
func (sec *Security) RegisterMySQL() error {
if sec == nil {
// BuildTLSConfig builds the tls config which is used by SQL drier later.
func (sec *Security) BuildTLSConfig() error {
if sec == nil || sec.TLSConfig != nil {
return nil
}

Expand All @@ -601,21 +602,10 @@ func (sec *Security) RegisterMySQL() error {
if err != nil {
return errors.Trace(err)
}
if tlsConfig != nil {
// error happens only when the key coincides with the built-in names.
_ = gomysql.RegisterTLSConfig(sec.TLSConfigName, tlsConfig)
}
sec.TLSConfig = tlsConfig
return nil
}

// DeregisterMySQL deregisters the TLS config with security.TLSConfigName
func (sec *Security) DeregisterMySQL() {
if sec == nil || len(sec.CAPath) == 0 {
return
}
gomysql.DeregisterTLSConfig(sec.TLSConfigName)
}

// A duration which can be deserialized from a TOML string.
// Implemented as https://github.com/BurntSushi/toml#using-the-encodingtextunmarshaler-interface
type Duration struct {
Expand Down Expand Up @@ -1136,18 +1126,27 @@ func (cfg *Config) AdjustCheckPoint() {
switch cfg.Checkpoint.Driver {
case CheckpointDriverMySQL:
param := common.MySQLConnectParam{
Host: cfg.TiDB.Host,
Port: cfg.TiDB.Port,
User: cfg.TiDB.User,
Password: cfg.TiDB.Psw,
SQLMode: mysql.DefaultSQLMode,
MaxAllowedPacket: defaultMaxAllowedPacket,
TLS: cfg.TiDB.TLS,
Host: cfg.TiDB.Host,
Port: cfg.TiDB.Port,
User: cfg.TiDB.User,
Password: cfg.TiDB.Psw,
SQLMode: mysql.DefaultSQLMode,
MaxAllowedPacket: defaultMaxAllowedPacket,
TLSConfig: cfg.TiDB.Security.TLSConfig,
AllowFallbackToPlaintext: cfg.TiDB.Security.AllowFallbackToPlaintext,
}
cfg.Checkpoint.MySQLParam = &param
case CheckpointDriverFile:
cfg.Checkpoint.DSN = "/tmp/" + cfg.Checkpoint.Schema + ".pb"
}
} else {
// try to remove allowAllFiles
mysqlCfg, err := gomysql.ParseDSN(cfg.Checkpoint.DSN)
if err != nil {
return
}
mysqlCfg.AllowAllFiles = false
cfg.Checkpoint.DSN = mysqlCfg.FormatDSN()
}
}

Expand Down Expand Up @@ -1180,22 +1179,22 @@ func (cfg *Config) CheckAndAdjustSecurity() error {
}

switch cfg.TiDB.TLS {
case "":
if len(cfg.TiDB.Security.CAPath) > 0 || len(cfg.TiDB.Security.CABytes) > 0 ||
len(cfg.TiDB.Security.CertPath) > 0 || len(cfg.TiDB.Security.CertBytes) > 0 ||
len(cfg.TiDB.Security.KeyPath) > 0 || len(cfg.TiDB.Security.KeyBytes) > 0 {
if cfg.TiDB.Security.TLSConfigName == "" {
cfg.TiDB.Security.TLSConfigName = uuid.NewString() // adjust this the default value
case "skip-verify", "preferred":
if cfg.TiDB.Security.TLSConfig == nil {
/* #nosec G402 */
cfg.TiDB.Security.TLSConfig = &tls.Config{
MinVersion: tls.VersionTLS10,
InsecureSkipVerify: true,
NextProtos: []string{"h2", "http/1.1"}, // specify `h2` to let Go use HTTP/2.
}
cfg.TiDB.TLS = cfg.TiDB.Security.TLSConfigName
} else {
cfg.TiDB.TLS = "false"
cfg.TiDB.Security.AllowFallbackToPlaintext = true
}
case "cluster":
if len(cfg.Security.CAPath) == 0 {
return common.ErrInvalidConfig.GenWithStack("cannot set `tidb.tls` to 'cluster' without a [security] section")
}
case "false", "skip-verify", "preferred":
case "", "false":
cfg.TiDB.TLS = "false"
return nil
default:
return common.ErrInvalidConfig.GenWithStack("unsupported `tidb.tls` config %s", cfg.TiDB.TLS)
Expand Down
Loading

0 comments on commit cd52df4

Please sign in to comment.