Skip to content

Commit

Permalink
Merge branch 'master' into add_field_stmtSummary
Browse files Browse the repository at this point in the history
  • Loading branch information
fzzf678 authored Dec 13, 2022
2 parents 4258045 + 621115b commit dbac95d
Show file tree
Hide file tree
Showing 43 changed files with 437 additions and 200 deletions.
5 changes: 2 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,7 @@ func loadDDLReorgVars(ctx context.Context, sessPool *sessionPool) error {
return ddlutil.LoadDDLReorgVars(ctx, sCtx)
}

func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) {
dbName := model.NewCIStr(sessCtx.GetSessionVars().CurrentDB)
func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table.Table) (map[int64]decoder.Column, error) {
writableColInfos := make([]*model.ColumnInfo, 0, len(t.WritableCols()))
for _, col := range t.WritableCols() {
writableColInfos = append(writableColInfos, col.ColumnInfo)
Expand Down Expand Up @@ -860,7 +859,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic

startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t)
decodeColMap, err := makeupDecodeColMap(sessCtx, reorgInfo.dbInfo.Name, t)
if err != nil {
return errors.Trace(err)
}
Expand Down
6 changes: 5 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,11 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs), false)
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down
8 changes: 8 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,6 +1712,14 @@ func TestCreateExpressionIndex(t *testing.T) {
require.NoError(t, checkErr)
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "0 11", "0 11", "1 7", "2 7", "5 7", "8 8", "10 10", "10 10"))

// https://github.com/pingcap/tidb/issues/39784
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(name varchar(20))")
tk.MustExec("insert into t values ('Abc'), ('Bcd'), ('abc')")
tk.MustExec("create index idx on test.t((lower(test.t.name)))")
tk.MustExec("admin check table t")
}

func TestCreateUniqueExpressionIndex(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,11 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, elements, mergingTmpIdx)
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down
6 changes: 5 additions & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1743,6 +1743,10 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
if err != nil {
return ver, errors.Trace(err)
}
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
// If table has global indexes, we need reorg to clean up them.
if pt, ok := tbl.(table.PartitionedTable); ok && hasGlobalIndex(tblInfo) {
// Build elements for compatible with modify column type. elements will not be used when reorganizing.
Expand All @@ -1753,7 +1757,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
}
}
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, tbl, physicalTableIDs, elements)
reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, dbInfo, tbl, physicalTableIDs, elements)

if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down
7 changes: 5 additions & 2 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ type reorgInfo struct {
// PhysicalTableID is used to trace the current partition we are handling.
// If the table is not partitioned, PhysicalTableID would be TableID.
PhysicalTableID int64
dbInfo *model.DBInfo
elements []*meta.Element
currElement *meta.Element
}
Expand Down Expand Up @@ -585,7 +586,7 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) {
return ver, nil
}

func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo,
tbl table.Table, elements []*meta.Element, mergingTmpIdx bool) (*reorgInfo, error) {
var (
element *meta.Element
Expand Down Expand Up @@ -685,11 +686,12 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
info.currElement = element
info.elements = elements
info.mergingTmpIdx = mergingTmpIdx
info.dbInfo = dbInfo

return &info, nil
}

func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
Expand Down Expand Up @@ -745,6 +747,7 @@ func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, jo
info.PhysicalTableID = pid
info.currElement = element
info.elements = elements
info.dbInfo = dbInfo

return &info, nil
}
Expand Down
4 changes: 0 additions & 4 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2457,10 +2457,6 @@ func (do *Domain) runTTLJobManager(ctx context.Context) {
ttlJobManager.Start()
do.ttlJobManager = ttlJobManager

// TODO: read the worker count from `do.sysVarCache` and resize the workers
ttlworker.ScanWorkersCount.Store(4)
ttlworker.DeleteWorkerCount.Store(4)

<-do.exit

ttlJobManager.Stop()
Expand Down
1 change: 1 addition & 0 deletions dumpling/export/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ go_test(
"//util/filter",
"//util/promutil",
"//util/table-filter",
"@com_github_coreos_go_semver//semver",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
Expand Down
18 changes: 18 additions & 0 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync/atomic"
"time"

"github.com/coreos/go-semver/semver"
// import mysql driver
"github.com/go-sql-driver/mysql"
"github.com/google/uuid"
Expand Down Expand Up @@ -46,6 +47,10 @@ var openDBFunc = openDB

var errEmptyHandleVals = errors.New("empty handleVals for TiDB table")

// After TiDB v6.2.0 we always enable tidb_enable_paging by default.
// see https://docs.pingcap.com/zh/tidb/dev/system-variables#tidb_enable_paging-%E4%BB%8E-v540-%E7%89%88%E6%9C%AC%E5%BC%80%E5%A7%8B%E5%BC%95%E5%85%A5
var enablePagingVersion = semver.New("6.2.0")

// Dumper is the dump progress structure
type Dumper struct {
tctx *tcontext.Context
Expand Down Expand Up @@ -1539,6 +1544,19 @@ func updateServiceSafePoint(tctx *tcontext.Context, pdClient pd.Client, ttl int6
}
}

// setDefaultSessionParams is a step to set default params for session params.
func setDefaultSessionParams(si version.ServerInfo, sessionParams map[string]interface{}) {
defaultSessionParams := map[string]interface{}{}
if si.ServerType == version.ServerTypeTiDB && si.HasTiKV && si.ServerVersion.Compare(*enablePagingVersion) >= 0 {
defaultSessionParams["tidb_enable_paging"] = "ON"
}
for k, v := range defaultSessionParams {
if _, ok := sessionParams[k]; !ok {
sessionParams[k] = v
}
}
}

// setSessionParam is an initialization step of Dumper.
func setSessionParam(d *Dumper) error {
conf, pool := d.conf, d.dbHandle
Expand Down
65 changes: 65 additions & 0 deletions dumpling/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/version"
tcontext "github.com/pingcap/tidb/dumpling/context"
Expand Down Expand Up @@ -224,3 +225,67 @@ func TestUnregisterMetrics(t *testing.T) {
// should not panic
require.Error(t, err)
}

func TestSetDefaultSessionParams(t *testing.T) {
testCases := []struct {
si version.ServerInfo
sessionParams map[string]interface{}
expectedParams map[string]interface{}
}{
{
si: version.ServerInfo{
ServerType: version.ServerTypeTiDB,
HasTiKV: true,
ServerVersion: semver.New("6.1.0"),
},
sessionParams: map[string]interface{}{
"tidb_snapshot": "2020-01-01 00:00:00",
},
expectedParams: map[string]interface{}{
"tidb_snapshot": "2020-01-01 00:00:00",
},
},
{
si: version.ServerInfo{
ServerType: version.ServerTypeTiDB,
HasTiKV: true,
ServerVersion: semver.New("6.2.0"),
},
sessionParams: map[string]interface{}{
"tidb_snapshot": "2020-01-01 00:00:00",
},
expectedParams: map[string]interface{}{
"tidb_enable_paging": "ON",
"tidb_snapshot": "2020-01-01 00:00:00",
},
},
{
si: version.ServerInfo{
ServerType: version.ServerTypeTiDB,
HasTiKV: true,
ServerVersion: semver.New("6.2.0"),
},
sessionParams: map[string]interface{}{
"tidb_enable_paging": "OFF",
"tidb_snapshot": "2020-01-01 00:00:00",
},
expectedParams: map[string]interface{}{
"tidb_enable_paging": "OFF",
"tidb_snapshot": "2020-01-01 00:00:00",
},
},
{
si: version.ServerInfo{
ServerType: version.ServerTypeMySQL,
ServerVersion: semver.New("8.0.32"),
},
sessionParams: map[string]interface{}{},
expectedParams: map[string]interface{}{},
},
}

for _, testCase := range testCases {
setDefaultSessionParams(testCase.si, testCase.sessionParams)
require.Equal(t, testCase.expectedParams, testCase.sessionParams)
}
}
2 changes: 2 additions & 0 deletions dumpling/export/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,8 @@ func buildMockNewRows(mock sqlmock.Sqlmock, columns []string, driverValues [][]d
}

func readRegionCsvDriverValues(t *testing.T) [][]driver.Value {
t.Helper()

csvFilename := "region_results.csv"
file, err := os.Open(csvFilename)
require.NoError(t, err)
Expand Down
25 changes: 13 additions & 12 deletions dumpling/export/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package export
import (
"context"
"database/sql/driver"
"io/ioutil"
"os"
"path"
"sync"
Expand All @@ -31,7 +30,7 @@ func TestWriteDatabaseMeta(t *testing.T) {
_, err = os.Stat(p)
require.NoError(t, err)

bytes, err := ioutil.ReadFile(p)
bytes, err := os.ReadFile(p)
require.NoError(t, err)
require.Equal(t, "/*!40101 SET NAMES binary*/;\nCREATE DATABASE `test`;\n", string(bytes))
}
Expand All @@ -50,7 +49,7 @@ func TestWritePolicyMeta(t *testing.T) {
_, err = os.Stat(p)
require.NoError(t, err)

bytes, err := ioutil.ReadFile(p)
bytes, err := os.ReadFile(p)
require.NoError(t, err)
require.Equal(t, "/*!40101 SET NAMES binary*/;\ncreate placement policy `y` followers=2;\n", string(bytes))
}
Expand All @@ -68,7 +67,7 @@ func TestWriteTableMeta(t *testing.T) {
p := path.Join(dir, "test.t-schema.sql")
_, err = os.Stat(p)
require.NoError(t, err)
bytes, err := ioutil.ReadFile(p)
bytes, err := os.ReadFile(p)
require.NoError(t, err)
require.Equal(t, "/*!40101 SET NAMES binary*/;\nCREATE TABLE t (a INT);\n", string(bytes))
}
Expand All @@ -89,14 +88,14 @@ func TestWriteViewMeta(t *testing.T) {
p := path.Join(dir, "test.v-schema.sql")
_, err = os.Stat(p)
require.NoError(t, err)
bytes, err := ioutil.ReadFile(p)
bytes, err := os.ReadFile(p)
require.NoError(t, err)
require.Equal(t, specCmt+createTableSQL, string(bytes))

p = path.Join(dir, "test.v-schema-view.sql")
_, err = os.Stat(p)
require.NoError(t, err)
bytes, err = ioutil.ReadFile(p)
bytes, err = os.ReadFile(p)
require.NoError(t, err)
require.Equal(t, specCmt+createViewSQL, string(bytes))
}
Expand Down Expand Up @@ -126,7 +125,7 @@ func TestWriteTableData(t *testing.T) {
p := path.Join(dir, "test.employee.000000000.sql")
_, err = os.Stat(p)
require.NoError(t, err)
bytes, err := ioutil.ReadFile(p)
bytes, err := os.ReadFile(p)
require.NoError(t, err)

expected := "/*!40101 SET NAMES binary*/;\n" +
Expand Down Expand Up @@ -182,7 +181,7 @@ func TestWriteTableDataWithFileSize(t *testing.T) {
p = path.Join(dir, p)
_, err := os.Stat(p)
require.NoError(t, err)
bytes, err := ioutil.ReadFile(p)
bytes, err := os.ReadFile(p)
require.NoError(t, err)
require.Equal(t, expected, string(bytes))
}
Expand Down Expand Up @@ -232,7 +231,7 @@ func TestWriteTableDataWithFileSizeAndRows(t *testing.T) {
p = path.Join(dir, p)
_, err = os.Stat(p)
require.NoError(t, err)
bytes, err := ioutil.ReadFile(p)
bytes, err := os.ReadFile(p)
require.NoError(t, err)
require.Equal(t, expected, string(bytes))
}
Expand Down Expand Up @@ -281,7 +280,7 @@ func TestWriteTableDataWithStatementSize(t *testing.T) {
p = path.Join(config.OutputDirPath, p)
_, err = os.Stat(p)
require.NoError(t, err)
bytes, err1 := ioutil.ReadFile(p)
bytes, err1 := os.ReadFile(p)
require.NoError(t, err1)
require.Equal(t, expected, string(bytes))
}
Expand All @@ -297,7 +296,7 @@ func TestWriteTableDataWithStatementSize(t *testing.T) {
require.NoError(t, err)
err = os.RemoveAll(config.OutputDirPath)
require.NoError(t, err)
config.OutputDirPath, err = ioutil.TempDir("", "dumpling")
config.OutputDirPath, err = os.MkdirTemp("", "dumpling")

writer = createTestWriter(config, t)

Expand All @@ -322,7 +321,7 @@ func TestWriteTableDataWithStatementSize(t *testing.T) {
p = path.Join(config.OutputDirPath, p)
_, err = os.Stat(p)
require.NoError(t, err)
bytes, err := ioutil.ReadFile(p)
bytes, err := os.ReadFile(p)
require.NoError(t, err)
require.Equal(t, expected, string(bytes))
}
Expand All @@ -331,6 +330,8 @@ func TestWriteTableDataWithStatementSize(t *testing.T) {
var mu sync.Mutex

func createTestWriter(conf *Config, t *testing.T) *Writer {
t.Helper()

mu.Lock()
extStore, err := conf.createExternalStorage(context.Background())
mu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ go_library(
"//parser/model",
"//parser/mysql",
"//parser/terror",
"//parser/tidb",
"//parser/types",
"//planner",
"//planner/core",
Expand Down
Loading

0 comments on commit dbac95d

Please sign in to comment.