Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support column type change between decimal && SQL mode warnings #20012

Merged
merged 19 commits into from
Sep 29, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comment
Signed-off-by: AilinKid <[email protected]>
  • Loading branch information
AilinKid committed Sep 27, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 8e57623e76939ee57742e18d9a68d777256a284a
78 changes: 42 additions & 36 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ package ddl

import (
"context"
"fmt"
"math"
"strconv"
"sync/atomic"
@@ -63,12 +64,10 @@ type backfiller interface {
}

type backfillResult struct {
addedCount int
scanCount int
nextHandle kv.Handle
err error
warnings []*terror.Error
warningsCount []int64
addedCount int
scanCount int
nextHandle kv.Handle
err error
}

// backfillTaskContext is the context of the batch adding indices or updating column values.
@@ -78,8 +77,8 @@ type backfillTaskContext struct {
done bool
addedCount int
scanCount int
warnings []*terror.Error
warningsCount []int64
warnings map[errors.ErrorID]*terror.Error
warningsCount map[errors.ErrorID]int64
}

type backfillWorker struct {
@@ -153,21 +152,15 @@ func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResu
result.nextHandle = taskCtx.nextHandle
result.addedCount += taskCtx.addedCount
result.scanCount += taskCtx.scanCount
result.warnings, result.warningsCount = mergeWarningsAndWarningsCount(taskCtx.warnings, result.warnings, taskCtx.warningsCount, result.warningsCount)
}

func mergeWarningsAndWarningsCount(partWarnings, totalWarnings []*terror.Error, partWarningsCount, totalWarningsCount []int64) ([]*terror.Error, []int64) {
for i, err1 := range partWarnings {
found := false
for j, err2 := range totalWarnings {
if terror.ErrorEqual(err1, err2) {
totalWarningsCount[j] += partWarningsCount[i]
found = true
}
}
if !found {
totalWarnings = append(totalWarnings, err1)
totalWarningsCount = append(totalWarningsCount, partWarningsCount[i])
func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) {
for _, warn := range partWarnings {
if _, ok := totalWarningsCount[warn.ID()]; ok {
totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()]
} else {
totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()]
totalWarnings[warn.ID()] = warn
}
}
return totalWarnings, totalWarningsCount
@@ -176,7 +169,11 @@ func mergeWarningsAndWarningsCount(partWarnings, totalWarnings []*terror.Error,
// handleBackfillTask backfills range [task.startHandle, task.endHandle) handle's index to table.
func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, bf backfiller) *backfillResult {
handleRange := *task
result := &backfillResult{addedCount: 0, nextHandle: handleRange.startHandle, err: nil}
result := &backfillResult{
err: nil,
addedCount: 0,
nextHandle: handleRange.startHandle,
}
lastLogCount := 0
lastLogTime := time.Now()
startTime := lastLogTime
@@ -200,7 +197,17 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,

bf.AddMetricInfo(float64(taskCtx.addedCount))
mergeBackfillCtxToResult(&taskCtx, result)

// Although `handleRange` is for data in one region, but back fill worker still split it into many
// small reorg batch size slices and reorg them in many different kv txn.
// If a task failed, it may contained some committed small kv txn which has already finished the
// small range reorganization.
// In the next round of reorganization, the target handle range may overlap with last committed
// small ranges. This will cause the `redo` action in reorganization.
// So for added count and warnings collection, it is recommended to collect the statistics in every
// successfully committed small ranges rather than fetching it in the total result.
w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount))
w.ddlWorker.reorgCtx.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount)

if num := result.scanCount - lastLogCount; num >= 30000 {
lastLogCount = result.scanCount
@@ -278,13 +285,11 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startHandle, endH
return ranges, nil
}

func (w *worker) waitTaskResults(workers []*backfillWorker, taskCnt int, totalAddedCount *int64, startHandle kv.Handle) (kv.Handle, int64, []*terror.Error, []int64, error) {
func (w *worker) waitTaskResults(workers []*backfillWorker, taskCnt int, totalAddedCount *int64, startHandle kv.Handle) (kv.Handle, int64, error) {
var (
addedCount int64
nextHandle = startHandle
firstErr error
warnings []*terror.Error
warningsCount []int64
addedCount int64
nextHandle = startHandle
firstErr error
)
for i := 0; i < taskCnt; i++ {
worker := workers[i]
@@ -304,11 +309,10 @@ func (w *worker) waitTaskResults(workers []*backfillWorker, taskCnt int, totalAd
*totalAddedCount += int64(result.addedCount)
addedCount += int64(result.addedCount)
nextHandle = result.nextHandle
warnings, warningsCount = mergeWarningsAndWarningsCount(result.warnings, warnings, result.warningsCount, warningsCount)
}
}

return nextHandle, addedCount, warnings, warningsCount, errors.Trace(firstErr)
return nextHandle, addedCount, errors.Trace(firstErr)
}

// handleReorgTasks sends tasks to workers, and waits for all the running workers to return results,
@@ -321,13 +325,11 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,
startHandle := batchTasks[0].startHandle
taskCnt := len(batchTasks)
startTime := time.Now()
nextHandle, taskAddedCount, warnings, warningsCount, err := w.waitTaskResults(workers, taskCnt, totalAddedCount, startHandle)
nextHandle, taskAddedCount, err := w.waitTaskResults(workers, taskCnt, totalAddedCount, startHandle)
elapsedTime := time.Since(startTime)
if err == nil {
err = w.isReorgRunnable(reorgInfo.d)
}
// Partial warnings will be cached into reorgCtx temporary, and merged be into job.warnings finally.
w.reorgCtx.setWarnings(warnings, warningsCount)

if err != nil {
// Update the reorg handle that has been processed.
@@ -372,8 +374,10 @@ func (w *worker) sendRangeTaskToWorkers(workers []*backfillWorker, reorgInfo *re
physicalTableID := reorgInfo.PhysicalTableID

// Build reorg tasks.
for _, keyRange := range kvRanges {
fmt.Println("kv ranges", len(kvRanges))
for i, keyRange := range kvRanges {
startHandle, endHandle, err := decodeHandleRange(keyRange)
fmt.Println("kv range", i, len(kvRanges))
if err != nil {
return nil, errors.Trace(err)
}
@@ -414,6 +418,8 @@ var (
TestCheckWorkerNumCh = make(chan struct{})
// TestCheckWorkerNumber use for test adjust backfill worker.
TestCheckWorkerNumber = int32(16)
// TestCheckReorgTimeout is used to mock timeout when reorg data.
TestCheckReorgTimeout = int32(0)
)

func loadDDLReorgVars(w *worker) error {
@@ -510,12 +516,12 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
sessCtx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true

if bfWorkerType == typeAddIndexWorker {
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo.SqlMode)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker)
} else {
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo.SqlMode)
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
updateWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker)
go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker)
16 changes: 12 additions & 4 deletions ddl/column.go
Original file line number Diff line number Diff line change
@@ -918,6 +918,7 @@ func (w *worker) doModifyColumnTypeWithData(
return w.updateColumnAndIndexes(tbl, oldCol, changingCol, changingIdxs, reorgInfo)
})
if err != nil {
fmt.Println("run reorg done", err.Error())
if errWaitReorgTimeout.Equal(err) {
// If timeout, we should return, check for the owner and re-wait job done.
return ver, nil
@@ -1176,6 +1177,16 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
return errors.Trace(err)
}
}

failpoint.Inject("MockReorgTimeoutInOneRegion", func(val failpoint.Value) {
if val.(bool) {
if handle.IntValue() == 3000 && atomic.LoadInt32(&TestCheckReorgTimeout) == 0 {
atomic.StoreInt32(&TestCheckReorgTimeout, 1)
failpoint.Return(errors.Trace(errWaitReorgTimeout))
}
}
})

w.rowMap[w.newColInfo.ID] = newColVal
newColumnIDs := make([]int64, 0, len(w.rowMap))
newRow := make([]types.Datum, 0, len(w.rowMap))
@@ -1256,10 +1267,7 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
}

// Collect the warnings.
for k, warning := range warningsMap {
taskCtx.warnings = append(taskCtx.warnings, warning)
taskCtx.warningsCount = append(taskCtx.warningsCount, warningsCountMap[k])
}
taskCtx.warnings, taskCtx.warningsCount = warningsMap, warningsCountMap

return nil
})
69 changes: 69 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
@@ -79,6 +79,7 @@ var _ = Suite(&testDBSuite7{&testDBSuite{}})
var _ = SerialSuites(&testSerialDBSuite{&testDBSuite{}})

const defaultBatchSize = 1024
const defaultReorgBatchSize = 256

type testDBSuite struct {
cluster cluster.Cluster
@@ -5805,3 +5806,71 @@ func (s *testSerialDBSuite) TestModifyColumnTypeWithWarnings(c *C) {
"Warning 1690 DECIMAL value is out of range in '(3, 1)'",
"Warning 1690 DECIMAL value is out of range in '(3, 1)'"))
}

// TestModifyColumnTypeWithWarningsWhenInterception is to test modifying column type with warnings intercepted by
// reorg timeout, not owner error and so on.
func (s *testSerialDBSuite) TestModifyColumnTypeWhenInterception(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
// Enable column change variable.
tk.Se.GetSessionVars().EnableChangeColumnType = true
defer func() {
tk.Se.GetSessionVars().EnableChangeColumnType = false
}()

// Test normal warnings.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b decimal(4,2))")

count := defaultBatchSize * 4
// Add some rows.
dml := fmt.Sprintf("insert into t values")
for i := 1; i <= count; i++ {
dml += fmt.Sprintf("(%d, %f)", i, 11.22)
if i != count {
dml += ","
}
}
tk.MustExec(dml)
// Make the regions scale like: [1, 1024), [1024, 2048), [2048, 3072), [3072, 4096]
tk.MustQuery("split table t between(0) and (4096) regions 4").Check(testkit.Rows("4 1"))

d := s.dom.DDL()
hook := &ddl.TestDDLCallback{}
var checkMiddleWarningCount bool
var checkMiddleAddedCount bool
// Since the `DefTiDBDDLReorgWorkerCount` is 4, every worker will be assigned with one region
// for the first time. Here we mock the insert failure/reorg timeout in region [2048, 3072)
// which will lead next handle be set to 2048 and partial warnings be stored into ddl job.
// Since the existence of reorg batch size, only the last reorg batch [2816, 3072) of kv
// range [2048, 3072) fail to commit, the rest of them all committed successfully. So the
// addedCount and warnings count in the job are all equal to `4096 - reorg batch size`.
// In the next round of this ddl job, the last reorg batch region will be finished.
var middleWarningsCount = int64(defaultBatchSize*4 - defaultReorgBatchSize)
hook.OnJobUpdatedExported = func(job *model.Job) {
if job.SchemaState == model.StateWriteReorganization || job.SnapshotVer != 0 {
if len(job.ReorgMeta.WarningsCount) == len(job.ReorgMeta.Warnings) {
for _, v := range job.ReorgMeta.WarningsCount {
if v == middleWarningsCount {
checkMiddleWarningCount = true
}
}
}
if job.RowCount == middleWarningsCount {
checkMiddleAddedCount = true
}
}
}
originHook := d.GetHook()
d.(ddl.DDLForTest).SetHook(hook)
defer d.(ddl.DDLForTest).SetHook(originHook)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/MockReorgTimeoutInOneRegion", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/MockReorgTimeoutInOneRegion"), IsNil)
}()
tk.MustExec("alter table t modify column b decimal(3,1)")
c.Assert(checkMiddleWarningCount, Equals, true)
c.Assert(checkMiddleAddedCount, Equals, true)
res := tk.MustQuery("show warnings")
c.Assert(len(res.Rows()), Equals, 4096)
}
8 changes: 4 additions & 4 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
@@ -524,12 +524,12 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
// If a job is a history job, the state must be JobStateSynced or JobStateRollbackDone or JobStateCancelled.
if historyJob.IsSynced() {
// Judge whether there are some warnings when executing DDL under the certain SQL mode.
if len(historyJob.Warnings) != 0 {
if len(historyJob.Warnings) != len(historyJob.WarningsCount) {
if historyJob.ReorgMeta != nil && len(historyJob.ReorgMeta.Warnings) != 0 {
if len(historyJob.ReorgMeta.Warnings) != len(historyJob.ReorgMeta.WarningsCount) {
logutil.BgLogger().Info("[ddl] DDL warnings doesn't match the warnings count", zap.Int64("jobID", jobID))
} else {
for i, warning := range historyJob.Warnings {
for j := int64(0); j < historyJob.WarningsCount[i]; j++ {
for key, warning := range historyJob.ReorgMeta.Warnings {
for j := int64(0); j < historyJob.ReorgMeta.WarningsCount[key]; j++ {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}
17 changes: 13 additions & 4 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ import (
"github.com/pingcap/parser/format"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
field_types "github.com/pingcap/parser/types"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/placement"
@@ -3644,8 +3645,12 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
SchemaName: schema.Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
SqlMode: ctx.GetSessionVars().SQLMode,
Args: []interface{}{&newCol, originalColName, spec.Position, modifyColumnTp, newAutoRandBits},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
},
Args: []interface{}{&newCol, originalColName, spec.Position, modifyColumnTp, newAutoRandBits},
}
return job, nil
}
@@ -3818,8 +3823,12 @@ func (d *ddl) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al
SchemaName: schema.Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
SqlMode: ctx.GetSessionVars().SQLMode,
Args: []interface{}{&newCol, oldColName, spec.Position, 0},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
},
Args: []interface{}{&newCol, oldColName, spec.Position, 0},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
Loading