Skip to content

Commit

Permalink
disttask: add load data scheduler and subtask executor (#42881)
Browse files Browse the repository at this point in the history
close #42880
  • Loading branch information
GMHDBJD authored Apr 14, 2023
1 parent 2d0564c commit 2062b68
Show file tree
Hide file tree
Showing 23 changed files with 602 additions and 689 deletions.
2 changes: 1 addition & 1 deletion ddl/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (b *backfillSchedulerHandle) InitSubtaskExecEnv(context.Context) error {
}

// SplitSubtask implements the Scheduler interface.
func (b *backfillSchedulerHandle) SplitSubtask(subtask []byte) ([]proto.MinimalTask, error) {
func (b *backfillSchedulerHandle) SplitSubtask(_ context.Context, subtask []byte) ([]proto.MinimalTask, error) {
logutil.BgLogger().Info("[ddl] lightning split subtask")

d := b.d
Expand Down
2 changes: 1 addition & 1 deletion disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (t *testScheduler) CleanupSubtaskExecEnv(_ context.Context) error { return

func (t *testScheduler) Rollback(_ context.Context) error { return nil }

func (t *testScheduler) SplitSubtask(subtask []byte) ([]proto.MinimalTask, error) {
func (t *testScheduler) SplitSubtask(_ context.Context, subtask []byte) ([]proto.MinimalTask, error) {
return []proto.MinimalTask{
testMiniTask{},
testMiniTask{},
Expand Down
6 changes: 5 additions & 1 deletion disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ type InternalScheduler interface {
// Scheduler defines the interface of a scheduler.
// User should implement this interface to define their own scheduler.
type Scheduler interface {
// InitSubtaskExecEnv is used to initialize the environment for the subtask executor.
InitSubtaskExecEnv(context.Context) error
SplitSubtask(subtask []byte) ([]proto.MinimalTask, error)
// SplitSubtask is used to split the subtask into multiple minimal tasks.
SplitSubtask(ctx context.Context, subtask []byte) ([]proto.MinimalTask, error)
// CleanupSubtaskExecEnv is used to clean up the environment for the subtask executor.
CleanupSubtaskExecEnv(context.Context) error
// Rollback is used to rollback all subtasks.
Rollback(context.Context) error
}

Expand Down
5 changes: 4 additions & 1 deletion disttask/framework/scheduler/interface_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ func (m *MockScheduler) InitSubtaskExecEnv(ctx context.Context) error {
}

// SplitSubtask implements Scheduler.SplitSubtask.
func (m *MockScheduler) SplitSubtask(subtask []byte) ([]proto.MinimalTask, error) {
func (m *MockScheduler) SplitSubtask(_ context.Context, subtask []byte) ([]proto.MinimalTask, error) {
args := m.Called(subtask)
if args.Error(1) != nil {
return nil, args.Error(1)
}
return args.Get(0).([]proto.MinimalTask), nil
}

Expand Down
2 changes: 1 addition & 1 deletion disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *InternalSchedulerImpl) Run(ctx context.Context, task *proto.Task) error
break
}

minimalTasks, err := scheduler.SplitSubtask(subtask.Meta)
minimalTasks, err := scheduler.SplitSubtask(context.Background(), subtask.Meta)
if err != nil {
s.onError(err)
break
Expand Down
10 changes: 5 additions & 5 deletions disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateFailed).Return(nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
err = scheduler.Run(runCtx, &proto.Task{Type: tp, ID: taskID, Concurrency: concurrency})
Expand All @@ -105,7 +105,7 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(runSubtaskErr).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateFailed).Return(nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
Expand All @@ -117,7 +117,7 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateSucceed).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(nil, nil).Once()
Expand All @@ -130,7 +130,7 @@ func TestSchedulerRun(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}, MockMinimalTask{}}).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}, MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(context.Canceled).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateCanceled).Return(nil).Once()
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestScheduler(t *testing.T) {
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}).Once()
mockScheduler.On("SplitSubtask", mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(runSubtaskErr).Once()
mockSubtaskTable.On("UpdateSubtaskState", taskID, proto.TaskStateFailed).Return(nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
Expand Down
38 changes: 8 additions & 30 deletions disttask/loaddata/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,51 +1,29 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "loaddata",
srcs = [
"dispatcher.go",
"proto.go",
"scheduler.go",
"subtask_executor.go",
"wrapper.go",
],
importpath = "github.com/pingcap/tidb/disttask/loaddata",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/config",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/common",
"//br/pkg/lightning/mydump",
"//br/pkg/storage",
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
"//disttask/framework/scheduler",
"//executor/importer",
"//parser/model",
"//parser/mysql",
"//util/intest",
"//table/tables",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_exp//maps",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "loaddata_test",
timeout = "short",
srcs = [
"dispatcher_test.go",
"wrapper_test.go",
],
data = glob(["testdata/**"]),
embed = [":loaddata"],
flaky = True,
shard_count = 4,
deps = [
"//br/pkg/lightning/config",
"//br/pkg/lightning/mydump",
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
"//executor/importer",
"//parser/model",
"@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
],
)
74 changes: 45 additions & 29 deletions disttask/loaddata/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@ import (
"context"
"encoding/json"

"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/disttask/framework/dispatcher"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

// FlowHandle is the dispatcher for load data.
type FlowHandle struct{}

// ProcessNormalFlow implements dispatcher.TaskFlowHandle interface.
func (*FlowHandle) ProcessNormalFlow(ctx context.Context, dispatch dispatcher.TaskHandle, gTask *proto.Task) ([][]byte, error) {
func (*FlowHandle) ProcessNormalFlow(ctx context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) ([][]byte, error) {
taskMeta := &TaskMeta{}
err := json.Unmarshal(gTask.Meta, taskMeta)
if err != nil {
Expand All @@ -44,16 +47,16 @@ func (*FlowHandle) ProcessNormalFlow(ctx context.Context, dispatch dispatcher.Ta
default:
}

schedulers, err := dispatch.GetAllSchedulerIDs(ctx, gTask.ID)
if err != nil {
return nil, err
}
subtaskMetas, err := generateSubtaskMetas(ctx, taskMeta, len(schedulers))
// schedulers, err := dispatch.GetAllSchedulerIDs(ctx, gTask.ID)
// if err != nil {
// return nil, err
// }
subtaskMetas, err := generateSubtaskMetas(ctx, taskMeta)
if err != nil {
return nil, err
}
logutil.BgLogger().Info("generate subtasks", zap.Any("subtask_metas", subtaskMetas))
metaBytes := make([][]byte, 0, len(taskMeta.FileInfos))
metaBytes := make([][]byte, 0, len(subtaskMetas))
for _, subtaskMeta := range subtaskMetas {
bs, err := json.Marshal(subtaskMeta)
if err != nil {
Expand All @@ -71,33 +74,46 @@ func (*FlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, _
return nil, nil
}

func generateSubtaskMetas(ctx context.Context, task *TaskMeta, concurrency int) ([]*SubtaskMeta, error) {
tableRegions, err := makeTableRegions(ctx, task, concurrency)
func generateSubtaskMetas(ctx context.Context, taskMeta *TaskMeta) ([]*SubtaskMeta, error) {
idAlloc := kv.NewPanickingAllocators(0)
tbl, err := tables.TableFromMeta(idAlloc, taskMeta.Plan.TableInfo)
if err != nil {
return nil, err
}
controller, err := importer.NewLoadDataController(&taskMeta.Plan, tbl)
if err != nil {
return nil, err
}
if err := controller.InitDataFiles(ctx); err != nil {
return nil, err
}

subtaskMetaMap := make(map[int32]*SubtaskMeta)
for _, region := range tableRegions {
subtaskMeta, ok := subtaskMetaMap[region.EngineID]
if !ok {
subtaskMeta = &SubtaskMeta{
Table: task.Table,
Format: task.Format,
Dir: task.Dir,
}
subtaskMetaMap[region.EngineID] = subtaskMeta
tableImporter, err := importer.NewTableImporter(&importer.JobImportParam{
GroupCtx: ctx,
}, controller)
if err != nil {
return nil, err
}

engineCheckpoints, err := tableImporter.PopulateChunks(ctx)
if err != nil {
return nil, err
}
subtaskMetas := make([]*SubtaskMeta, 0, len(engineCheckpoints))
for id, ecp := range engineCheckpoints {
if id == common.IndexEngineID {
continue
}
subtaskMeta := &SubtaskMeta{
ID: id,
Plan: taskMeta.Plan,
}
for _, chunkCheckpoint := range ecp.Chunks {
subtaskMeta.Chunks = append(subtaskMeta.Chunks, toChunk(*chunkCheckpoint))
}
subtaskMeta.Chunks = append(subtaskMeta.Chunks, Chunk{
Path: region.FileMeta.Path,
Offset: region.Chunk.Offset,
EndOffset: region.Chunk.EndOffset,
RealOffset: region.Chunk.RealOffset,
PrevRowIDMax: region.Chunk.PrevRowIDMax,
RowIDMax: region.Chunk.RowIDMax,
})
subtaskMetas = append(subtaskMetas, subtaskMeta)
}
return maps.Values[map[int32]*SubtaskMeta](subtaskMetaMap), nil
return subtaskMetas, nil
}

func init() {
Expand Down
119 changes: 0 additions & 119 deletions disttask/loaddata/dispatcher_test.go

This file was deleted.

Loading

0 comments on commit 2062b68

Please sign in to comment.