Skip to content

Commit

Permalink
ddl: eliminate ingest step for add index with local engine (#47982)
Browse files Browse the repository at this point in the history
close #47981
  • Loading branch information
tangenta authored Oct 30, 2023
1 parent f9f6bb3 commit fd3b2cc
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 164 deletions.
2 changes: 0 additions & 2 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"backfilling_dispatcher.go",
"backfilling_dist_scheduler.go",
"backfilling_import_cloud.go",
"backfilling_import_local.go",
"backfilling_merge_sort.go",
"backfilling_operators.go",
"backfilling_proto.go",
Expand Down Expand Up @@ -130,7 +129,6 @@ go_library(
"//pkg/util/collate",
"//pkg/util/dbterror",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/disttask",
"//pkg/util/domainutil",
"//pkg/util/filter",
"//pkg/util/gcutil",
Expand Down
85 changes: 23 additions & 62 deletions pkg/ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,18 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
disttaskutil "github.com/pingcap/tidb/pkg/util/disttask"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

// BackfillingDispatcherExt is an extension of litBackfillDispatcher, exported for test.
type BackfillingDispatcherExt struct {
d *ddl
previousSchedulerIDs []string
GlobalSort bool
d *ddl
GlobalSort bool
}

// NewBackfillingDispatcherExt creates a new backfillingDispatcherExt, only used for test now.
Expand Down Expand Up @@ -97,7 +95,12 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch(
if tblInfo.Partition != nil {
return generatePartitionPlan(tblInfo)
}
return generateNonPartitionPlan(dsp.d, tblInfo, job)
is, err := dsp.GetEligibleInstances(ctx, gTask)
if err != nil {
return nil, err
}
instanceCnt := len(is)
return generateNonPartitionPlan(dsp.d, tblInfo, job, dsp.GlobalSort, instanceCnt)
case StepMergeSort:
res, err := generateMergePlan(taskHandle, gTask, logger)
if err != nil {
Expand Down Expand Up @@ -135,11 +138,7 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch(
prevStep,
logger)
}
// for partition table, no subtasks for write and ingest step.
if tblInfo.Partition != nil {
return nil, nil
}
return generateIngestTaskPlan(ctx, dsp, taskHandle, gTask)
return nil, nil
default:
return nil, nil
}
Expand All @@ -163,7 +162,7 @@ func (dsp *BackfillingDispatcherExt) GetNextStep(task *proto.Task) proto.Step {
if dsp.GlobalSort {
return StepMergeSort
}
return StepWriteAndIngest
return proto.StepDone
case StepMergeSort:
return StepWriteAndIngest
case StepWriteAndIngest:
Expand Down Expand Up @@ -196,21 +195,11 @@ func (*BackfillingDispatcherExt) OnErrStage(_ context.Context, _ dispatcher.Task
}

// GetEligibleInstances implements dispatcher.Extension interface.
func (dsp *BackfillingDispatcherExt) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, error) {
func (*BackfillingDispatcherExt) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, error) {
serverInfos, err := dispatcher.GenerateSchedulerNodes(ctx)
if err != nil {
return nil, err
}
if len(dsp.previousSchedulerIDs) > 0 {
// Only the nodes that executed step one can have step two.
involvedServerInfos := make([]*infosync.ServerInfo, 0, len(serverInfos))
for _, id := range dsp.previousSchedulerIDs {
if idx := disttaskutil.FindServerInfo(serverInfos, id); idx >= 0 {
involvedServerInfos = append(involvedServerInfos, serverInfos[idx])
}
}
return involvedServerInfos, nil
}
return serverInfos, nil
}

Expand Down Expand Up @@ -286,7 +275,8 @@ func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error)
return subTaskMetas, nil
}

func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job) (metas [][]byte, err error) {
func generateNonPartitionPlan(
d *ddl, tblInfo *model.TableInfo, job *model.Job, useCloud bool, instanceCnt int) (metas [][]byte, err error) {
tbl, err := getTable(d.store, job.SchemaID, tblInfo)
if err != nil {
return nil, err
Expand All @@ -309,8 +299,15 @@ func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job)
return nil, err
}

subTaskMetas := make([][]byte, 0, 100)
regionBatch := 20
regionBatch := 100
if !useCloud {
// Make subtask large enough to reduce the overhead of local/global flush.
quota := variable.DDLDiskQuota.Load()
regionBatch = int(int64(quota) / int64(config.SplitRegionSize))
}
regionBatch = min(regionBatch, len(recordRegionMetas)/instanceCnt)

subTaskMetas := make([][]byte, 0, 4)
sort.Slice(recordRegionMetas, func(i, j int) bool {
return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0
})
Expand Down Expand Up @@ -341,42 +338,6 @@ func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job)
return subTaskMetas, nil
}

func generateIngestTaskPlan(
ctx context.Context,
h *BackfillingDispatcherExt,
taskHandle dispatcher.TaskHandle,
gTask *proto.Task,
) ([][]byte, error) {
// We dispatch dummy subtasks because the rest data in local engine will be imported
// in the initialization of subtask executor.
var ingestSubtaskCnt int
if intest.InTest && taskHandle == nil {
serverNodes, err := dispatcher.GenerateSchedulerNodes(ctx)
if err != nil {
return nil, err
}
ingestSubtaskCnt = len(serverNodes)
} else {
schedulerIDs, err := taskHandle.GetPreviousSchedulerIDs(ctx, gTask.ID, gTask.Step)
if err != nil {
return nil, err
}
h.previousSchedulerIDs = schedulerIDs
ingestSubtaskCnt = len(schedulerIDs)
}

subTaskMetas := make([][]byte, 0, ingestSubtaskCnt)
dummyMeta := &BackfillSubTaskMeta{}
metaBytes, err := json.Marshal(dummyMeta)
if err != nil {
return nil, err
}
for i := 0; i < ingestSubtaskCnt; i++ {
subTaskMetas = append(subTaskMetas, metaBytes)
}
return subTaskMetas, nil
}

func generateGlobalSortIngestPlan(
ctx context.Context,
taskHandle dispatcher.TaskHandle,
Expand Down
13 changes: 1 addition & 12 deletions pkg/ddl/backfilling_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ func TestBackfillingDispatcherLocalMode(t *testing.T) {
// 1.2 test partition table OnNextSubtasksBatch after StepReadIndex
gTask.State = proto.TaskStateRunning
gTask.Step = dsp.GetNextStep(gTask)
require.Equal(t, ddl.StepWriteAndIngest, gTask.Step)
// for partition table, we will not generate subtask for StepWriteAndIngest.
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
require.NoError(t, err)
require.Len(t, metas, 0)
gTask.Step = dsp.GetNextStep(gTask)
require.Equal(t, proto.StepDone, gTask.Step)
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
require.NoError(t, err)
Expand Down Expand Up @@ -123,11 +117,6 @@ func TestBackfillingDispatcherLocalMode(t *testing.T) {
// 2.2.2 StepReadIndex
gTask.State = proto.TaskStateRunning
gTask.Step = dsp.GetNextStep(gTask)
require.Equal(t, ddl.StepWriteAndIngest, gTask.Step)
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
require.NoError(t, err)
require.Equal(t, 1, len(metas))
gTask.Step = dsp.GetNextStep(gTask)
require.Equal(t, proto.StepDone, gTask.Step)
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
require.NoError(t, err)
Expand Down Expand Up @@ -267,7 +256,7 @@ func TestGetNextStep(t *testing.T) {
ext := &ddl.BackfillingDispatcherExt{}

// 1. local mode
for _, nextStep := range []proto.Step{ddl.StepReadIndex, ddl.StepWriteAndIngest} {
for _, nextStep := range []proto.Step{ddl.StepReadIndex, proto.StepDone} {
require.Equal(t, nextStep, ext.GetNextStep(task))
task.Step = nextStep
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
if len(bgm.CloudStorageURI) > 0 {
return newCloudImportExecutor(&bgm.Job, jobMeta.ID, indexInfos[0], tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI)
}
return newImportFromLocalStepExecutor(jobMeta.ID, indexInfos, tbl.(table.PhysicalTable), bc), nil
return nil, errors.Errorf("local import does not have write & ingest step")
default:
return nil, errors.Errorf("unknown step %d for job %d", stage, jobMeta.ID)
}
Expand Down
81 changes: 0 additions & 81 deletions pkg/ddl/backfilling_import_local.go

This file was deleted.

6 changes: 1 addition & 5 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,13 +734,9 @@ func (s *indexWriteResultSink) flush() error {
failpoint.Inject("mockFlushError", func(_ failpoint.Value) {
failpoint.Return(errors.New("mock flush error"))
})
flushMode := ingest.FlushModeForceLocalAndCheckDiskQuota
if s.tbl.GetPartitionedTable() != nil {
flushMode = ingest.FlushModeForceGlobal
}
for _, index := range s.indexes {
idxInfo := index.Meta()
_, _, err := s.backendCtx.Flush(idxInfo.ID, flushMode)
_, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceGlobal)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta())
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import "github.com/pingcap/tidb/pkg/disttask/framework/proto"
// the initial step is StepInit(-1)
// steps are processed in the following order:
// - local sort:
// StepInit -> StepReadIndex -> StepWriteAndIngest -> StepDone
// StepInit -> StepReadIndex -> StepDone
// - global sort:
// StepInit -> StepReadIndex -> StepMergeSort -> StepWriteAndIngest -> StepDone
const (
Expand Down

0 comments on commit fd3b2cc

Please sign in to comment.