Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#53548
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
D3Hunter authored and ti-chi-bot committed Jul 30, 2024
1 parent cfd1d3a commit 8d5a329
Show file tree
Hide file tree
Showing 29 changed files with 964 additions and 310 deletions.
37 changes: 26 additions & 11 deletions pkg/autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,17 +322,9 @@ func newWithCli(selfAddr string, cli *clientv3.Client, store kv.Storage) *Servic
leaderShip: l,
store: store,
}
l.SetBeOwnerHook(func() {
// Reset the map to avoid a case that a node lose leadership and regain it, then
// improperly use the stale map to serve the autoid requests.
// See https://github.com/pingcap/tidb/issues/52600
service.autoIDLock.Lock()
clear(service.autoIDMap)
service.autoIDLock.Unlock()

logutil.BgLogger().Info("leader change of autoid service, this node become owner",
zap.String("addr", selfAddr),
zap.String("category", "autoid service"))
l.SetListener(&ownerListener{
Service: service,
selfAddr: selfAddr,
})
// 10 means that autoid service's etcd lease is 10s.
err := l.CampaignOwner(10)
Expand Down Expand Up @@ -580,6 +572,29 @@ func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoi
return &autoid.RebaseResponse{}, nil
}

type ownerListener struct {
*Service
selfAddr string
}

var _ owner.Listener = (*ownerListener)(nil)

func (l *ownerListener) OnBecomeOwner() {
// Reset the map to avoid a case that a node lose leadership and regain it, then
// improperly use the stale map to serve the autoid requests.
// See https://github.com/pingcap/tidb/issues/52600
l.autoIDLock.Lock()
clear(l.autoIDMap)
l.autoIDLock.Unlock()

logutil.BgLogger().Info("leader change of autoid service, this node become owner",
zap.String("addr", l.selfAddr),
zap.String("category", "autoid service"))
}

func (*ownerListener) OnRetireOwner() {
}

func init() {
autoid1.MockForTest = MockForTest
}
17 changes: 17 additions & 0 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,20 @@ func SetBackfillTaskChanSizeForTest(n int) {
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
<<<<<<< HEAD
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

=======
func (dc *ddlCtx) writePhysicalTableRecord(
ctx context.Context,
sessPool *sess.Pool,
t table.PhysicalTable,
bfWorkerType backfillerType,
reorgInfo *reorgInfo,
) error {
>>>>>>> 04c66ee9508 (ddl: decouple job scheduler from 'ddl' and make it run/exit as owner changes (#53548))
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey

if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
Expand All @@ -697,8 +707,15 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical
})

jc := reorgInfo.NewJobContext()
<<<<<<< HEAD
sessCtx := newContext(reorgInfo.d.store)
scheduler, err := newBackfillScheduler(dc.ctx, reorgInfo, sessPool, bfWorkerType, t, sessCtx, jc)
=======

eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)

scheduler, err := newBackfillScheduler(egCtx, reorgInfo, sessPool, bfWorkerType, t, jc)
>>>>>>> 04c66ee9508 (ddl: decouple job scheduler from 'ddl' and make it run/exit as owner changes (#53548))
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func (s *backfillDistExecutor) newBackfillSubtaskExecutor(
jobMeta := &s.taskMeta.Job
ddlObj := s.d

// TODO getTableByTxn is using DDL ctx which is never cancelled except when shutdown.
// we should move this operation out of GetStepExecutor, and put into Init.
_, tblIface, err := ddlObj.getTableByTxn((*asAutoIDRequirement)(ddlObj.ddlCtx), jobMeta.SchemaID, jobMeta.TableID)
if err != nil {
return nil, err
Expand Down
15 changes: 8 additions & 7 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/backoff"
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
Expand Down Expand Up @@ -89,7 +88,7 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch(
return nil, err
}
job := &backfillMeta.Job
tblInfo, err := getTblInfo(sch.d, job)
tblInfo, err := getTblInfo(ctx, sch.d, job)
if err != nil {
return nil, err
}
Expand All @@ -101,7 +100,7 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch(
if tblInfo.Partition != nil {
return generatePartitionPlan(tblInfo)
}
return generateNonPartitionPlan(sch.d, tblInfo, job, sch.GlobalSort, len(execIDs))
return generateNonPartitionPlan(ctx, sch.d, tblInfo, job, sch.GlobalSort, len(execIDs))
case proto.BackfillStepMergeSort:
return generateMergePlan(taskHandle, task, logger)
case proto.BackfillStepWriteAndIngest:
Expand Down Expand Up @@ -201,8 +200,8 @@ func (sch *LitBackfillScheduler) Close() {
sch.BaseScheduler.Close()
}

func getTblInfo(d *ddl, job *model.Job) (tblInfo *model.TableInfo, err error) {
err = kv.RunInNewTxn(d.ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error {
func getTblInfo(ctx context.Context, d *ddl, job *model.Job) (tblInfo *model.TableInfo, err error) {
err = kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error {
tblInfo, err = meta.NewMeta(txn).GetTable(job.SchemaID, job.TableID)
return err
})
Expand Down Expand Up @@ -242,11 +241,13 @@ const (
)

func generateNonPartitionPlan(
ctx context.Context,
d *ddl,
tblInfo *model.TableInfo,
job *model.Job,
useCloud bool,
instanceCnt int) (metas [][]byte, err error) {
instanceCnt int,
) (metas [][]byte, err error) {
tbl, err := getTable((*asAutoIDRequirement)(d.ddlCtx), job.SchemaID, tblInfo)
if err != nil {
return nil, err
Expand All @@ -267,7 +268,7 @@ func generateNonPartitionPlan(

subTaskMetas := make([][]byte, 0, 4)
backoffer := backoff.NewExponential(scanRegionBackoffBase, 2, scanRegionBackoffMax)
err = handle.RunWithRetry(d.ctx, 8, backoffer, tidblogutil.Logger(d.ctx), func(_ context.Context) (bool, error) {
err = handle.RunWithRetry(ctx, 8, backoffer, logutil.DDLLogger(), func(_ context.Context) (bool, error) {
regionCache := d.store.(helper.Storage).GetRegionCache()
recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/ddl/backfilling_dist_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
task.Step = sch.GetNextStep(&task.TaskBase)
require.Equal(t, proto.BackfillStepReadIndex, task.Step)
execIDs := []string{":4000"}
metas, err := sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
ctx := util.WithInternalSourceType(context.Background(), "backfill")
metas, err := sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, len(tblInfo.Partition.Definitions), len(metas))
for i, par := range tblInfo.Partition.Definitions {
Expand All @@ -73,7 +74,7 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
task.State = proto.TaskStateRunning
task.Step = sch.GetNextStep(&task.TaskBase)
require.Equal(t, proto.StepDone, task.Step)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Len(t, metas, 0)

Expand All @@ -85,7 +86,7 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
// 2.1 empty table
tk.MustExec("create table t1(id int primary key, v int)")
task = createAddIndexTask(t, dom, "test", "t1", proto.Backfill, false)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, 0, len(metas))
// 2.2 non empty table.
Expand All @@ -97,15 +98,15 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
task = createAddIndexTask(t, dom, "test", "t2", proto.Backfill, false)
// 2.2.1 stepInit
task.Step = sch.GetNextStep(&task.TaskBase)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, 1, len(metas))
require.Equal(t, proto.BackfillStepReadIndex, task.Step)
// 2.2.2 BackfillStepReadIndex
task.State = proto.TaskStateRunning
task.Step = sch.GetNextStep(&task.TaskBase)
require.Equal(t, proto.StepDone, task.Step)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, 0, len(metas))
}
Expand Down
32 changes: 16 additions & 16 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ func checkSystemSchemaID(t *meta.Meta, schemaID int64, flashbackTSString string)
return nil
}

func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(d.ctx, se, flashbackTS); err != nil {
func checkAndSetFlashbackClusterInfo(ctx context.Context, se sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(ctx, se, flashbackTS); err != nil {
return err
}

Expand All @@ -231,13 +231,13 @@ func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.M
if err = closePDSchedule(); err != nil {
return err
}
if err = setTiDBEnableAutoAnalyze(d.ctx, se, variable.Off); err != nil {
if err = setTiDBEnableAutoAnalyze(ctx, se, variable.Off); err != nil {
return err
}
if err = setTiDBSuperReadOnly(d.ctx, se, variable.On); err != nil {
if err = setTiDBSuperReadOnly(ctx, se, variable.On); err != nil {
return err
}
if err = setTiDBTTLJobEnable(d.ctx, se, variable.Off); err != nil {
if err = setTiDBTTLJobEnable(ctx, se, variable.Off); err != nil {
return err
}

Expand All @@ -256,12 +256,12 @@ func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.M

// Check if there is an upgrade during [flashbackTS, now)
sql := fmt.Sprintf("select VARIABLE_VALUE from mysql.tidb as of timestamp '%s' where VARIABLE_NAME='tidb_server_version'", flashbackTSString)
rows, err := sess.NewSession(se).Execute(d.ctx, sql, "check_tidb_server_version")
rows, err := sess.NewSession(se).Execute(ctx, sql, "check_tidb_server_version")
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history `tidb_server_version` failed, can't do flashback")
}
sql = fmt.Sprintf("select 1 from mysql.tidb where VARIABLE_NAME='tidb_server_version' and VARIABLE_VALUE=%s", rows[0].GetString(0))
rows, err = sess.NewSession(se).Execute(d.ctx, sql, "check_tidb_server_version")
rows, err = sess.NewSession(se).Execute(ctx, sql, "check_tidb_server_version")
if err != nil {
return errors.Trace(err)
}
Expand All @@ -271,7 +271,7 @@ func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.M

// Check is there a DDL task at flashbackTS.
sql = fmt.Sprintf("select count(*) from mysql.%s as of timestamp '%s'", JobTable, flashbackTSString)
rows, err = sess.NewSession(se).Execute(d.ctx, sql, "check_history_job")
rows, err = sess.NewSession(se).Execute(ctx, sql, "check_history_job")
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history ddl jobs failed, can't do flashback")
}
Expand Down Expand Up @@ -609,12 +609,12 @@ func flashbackToVersion(
).RunOnRange(ctx, startKey, endKey)
}

func splitRegionsByKeyRanges(d *ddlCtx, keyRanges []kv.KeyRange) {
func splitRegionsByKeyRanges(ctx context.Context, d *ddlCtx, keyRanges []kv.KeyRange) {
if s, ok := d.store.(kv.SplittableStore); ok {
for _, keys := range keyRanges {
for {
// tableID is useless when scatter == false
_, err := s.SplitRegions(d.ctx, [][]byte{keys.StartKey, keys.EndKey}, false, nil)
_, err := s.SplitRegions(ctx, [][]byte{keys.StartKey, keys.EndKey}, false, nil)
if err == nil {
break
}
Expand Down Expand Up @@ -696,12 +696,12 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule, get flashback key ranges.
case model.StateDeleteOnly:
if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil {
if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, d, t, job, flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// We should get startTS here to avoid lost startTS when TiDB crashed during send prepare flashback RPC.
startTS, err = d.store.GetOracle().GetTimestamp(d.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
startTS, err = d.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand All @@ -722,10 +722,10 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return updateSchemaVersion(d, t, job)
}
// Split region by keyRanges, make sure no unrelated key ranges be locked.
splitRegionsByKeyRanges(d, keyRanges)
splitRegionsByKeyRanges(w.ctx, d, keyRanges)
totalRegions.Store(0)
for _, r := range keyRanges {
if err = flashbackToVersion(d.ctx, d,
if err = flashbackToVersion(w.ctx, d,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := SendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, startTS, r)
totalRegions.Add(uint64(stats.CompletedRegions))
Expand All @@ -738,7 +738,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
job.Args[totalLockedRegionsOffset] = totalRegions.Load()

// We should get commitTS here to avoid lost commitTS when TiDB crashed during send flashback RPC.
commitTS, err = d.store.GetOracle().GetTimestamp(d.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err = d.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -756,7 +756,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}

for _, r := range keyRanges {
if err = flashbackToVersion(d.ctx, d,
if err = flashbackToVersion(w.ctx, d,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// Use same startTS as prepare phase to simulate 1PC txn.
stats, err := SendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, startTS, commitTS, r)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err
// https://github.com/pingcap/tidb/issues/38297
return dbterror.ErrCancelledDDLJob.GenWithStack("Modify Column on partitioned table / typeUpdateColumnWorker not yet supported.")
}
err := w.writePhysicalTableRecord(w.sessPool, p, workType, reorgInfo)
err := w.writePhysicalTableRecord(w.ctx, w.sessPool, p, workType, reorgInfo)
if err != nil {
return err
}
Expand All @@ -1105,7 +1105,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err
return nil
}
if tbl, ok := t.(table.PhysicalTable); ok {
return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
return w.writePhysicalTableRecord(w.ctx, w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
}
return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
Expand Down
Loading

0 comments on commit 8d5a329

Please sign in to comment.