Skip to content

Commit

Permalink
dxf: merge OnFinished into RunSubtask (#58098)
Browse files Browse the repository at this point in the history
ref #57497
  • Loading branch information
D3Hunter authored Dec 10, 2024
1 parent 68ac9ec commit 3adc71c
Show file tree
Hide file tree
Showing 14 changed files with 42 additions and 168 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ mock_lightning: mockgen

.PHONY: gen_mock
gen_mock: mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor TaskTable,Pool,TaskExecutor,Extension > pkg/disttask/framework/mock/task_executor_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor TaskTable,TaskExecutor,Extension > pkg/disttask/framework/mock/task_executor_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Scheduler,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -destination pkg/disttask/framework/scheduler/mock/scheduler_mock.go -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Extension
tools/bin/mockgen -embed -package mockexecute github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute StepExecutor > pkg/disttask/framework/mock/execute/execute_mock.go
Expand Down
5 changes: 0 additions & 5 deletions pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,3 @@ func (m *cloudImportExecutor) Cleanup(ctx context.Context) error {
ingest.LitBackCtxMgr.Unregister(m.job.ID)
return nil
}

func (*cloudImportExecutor) OnFinished(ctx context.Context, _ *proto.Subtask) error {
logutil.Logger(ctx).Info("cloud import executor finish subtask")
return nil
}
8 changes: 6 additions & 2 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
memSizePerCon := res.Mem.Capacity() / int64(subtask.Concurrency)
partSize := max(external.MinUploadPartSize, memSizePerCon*int64(external.MaxMergingFilesPerThread)/10000)

return external.MergeOverlappingFiles(
err = external.MergeOverlappingFiles(
ctx,
sm.DataFiles,
store,
Expand All @@ -100,14 +100,18 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
subtask.Concurrency,
true,
)
if err != nil {
return errors.Trace(err)
}
return m.onFinished(ctx, subtask)
}

func (*mergeSortExecutor) Cleanup(ctx context.Context) error {
logutil.Logger(ctx).Info("merge cleanup subtask exec env")
return nil
}

func (m *mergeSortExecutor) OnFinished(ctx context.Context, subtask *proto.Subtask) error {
func (m *mergeSortExecutor) onFinished(ctx context.Context, subtask *proto.Subtask) error {
logutil.Logger(ctx).Info("merge sort finish subtask")
sm, err := decodeBackfillSubTaskMeta(subtask.Meta)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
if err != nil {
return err
}
return executeAndClosePipeline(opCtx, pipe, nil, nil, 0)
if err = executeAndClosePipeline(opCtx, pipe, nil, nil, 0); err != nil {
return errors.Trace(err)
}
return r.onFinished(ctx, subtask)
}

pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
Expand All @@ -131,7 +134,11 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
}
return err
}
return r.bc.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
err = r.bc.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
if err != nil {
return errors.Trace(err)
}
return r.onFinished(ctx, subtask)
}

func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary {
Expand All @@ -147,7 +154,7 @@ func (r *readIndexExecutor) Cleanup(ctx context.Context) error {
return nil
}

func (r *readIndexExecutor) OnFinished(ctx context.Context, subtask *proto.Subtask) error {
func (r *readIndexExecutor) onFinished(ctx context.Context, subtask *proto.Subtask) error {
failpoint.InjectCall("mockDMLExecutionAddIndexSubTaskFinish")
if len(r.cloudStorageURI) == 0 {
return nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/disttask/framework/integrationtests/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ func TestFrameworkCancelTask(t *testing.T) {

registerExampleTask(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/beforeCallOnSubtaskFinished",
func(subtask *proto.Subtask) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterRunSubtask",
func(e taskexecutor.TaskExecutor, _ *error) {
if counter.Add(1) == 1 {
require.NoError(t, c.TaskMgr.CancelTask(c.Ctx, subtask.TaskID))
require.NoError(t, c.TaskMgr.CancelTask(c.Ctx, e.GetTaskBase().ID))
}
},
)
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestFrameworkRunSubtaskCancelOrFailed(t *testing.T) {
registerExampleTask(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
t.Run("meet cancel on run subtask", func(t *testing.T) {
var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/changeRunSubtaskError",
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterRunSubtask",
func(e taskexecutor.TaskExecutor, errP *error) {
if counter.Add(1) == 1 {
e.CancelRunningSubtask()
Expand All @@ -258,7 +258,7 @@ func TestFrameworkRunSubtaskCancelOrFailed(t *testing.T) {

t.Run("meet some error on run subtask", func(t *testing.T) {
var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/changeRunSubtaskError",
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterRunSubtask",
func(_ taskexecutor.TaskExecutor, errP *error) {
if counter.Add(1) == 1 {
*errP = errors.New("MockExecutorRunErr")
Expand Down
14 changes: 0 additions & 14 deletions pkg/disttask/framework/mock/execute/execute_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 2 additions & 70 deletions pkg/disttask/framework/mock/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 2 additions & 6 deletions pkg/disttask/framework/taskexecutor/execute/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,13 @@ type StepExecutor interface {
// subtask as failed, to trigger task failure.
Init(context.Context) error
// RunSubtask is used to run the subtask.
// The subtask meta can be updated in place, if no error returned, the subtask
// meta will be updated in the task table.
RunSubtask(ctx context.Context, subtask *proto.Subtask) error

// RealtimeSummary returns the realtime summary of the running subtask by this executor.
RealtimeSummary() *SubtaskSummary

// OnFinished is used to handle the subtask when it is finished.
// The subtask meta can be updated in place. only when OnFinished returns no
// err, a subtask can be marked as 'success', if it returns error, the subtask
// might be completely rerun, so don't put code that's prone to error in it.
// TODO merge with RunSubtask, seems no need to have a separate API.
OnFinished(ctx context.Context, subtask *proto.Subtask) error
// Cleanup is used to clean up the environment for this step.
// the returned error will not affect task/subtask state, it's only logged,
// so don't put code that's prone to error in it.
Expand Down
12 changes: 0 additions & 12 deletions pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ type TaskTable interface {
RunningSubtasksBack2Pending(ctx context.Context, subtasks []*proto.SubtaskBase) error
}

// Pool defines the interface of a pool.
type Pool interface {
Run(func()) error
RunWithConcurrency(chan func(), uint32) error
ReleaseAndWait()
}

// TaskExecutor is the executor for a task.
// Each task type should implement this interface.
// context tree of task execution:
Expand Down Expand Up @@ -149,8 +142,3 @@ func (*EmptyStepExecutor) RealtimeSummary() *execute.SubtaskSummary {
func (*EmptyStepExecutor) Cleanup(context.Context) error {
return nil
}

// OnFinished implements the StepExecutor interface.
func (*EmptyStepExecutor) OnFinished(_ context.Context, _ *proto.Subtask) error {
return nil
}
7 changes: 1 addition & 6 deletions pkg/disttask/framework/taskexecutor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {
}()
return e.stepExec.RunSubtask(e.stepCtx, subtask)
}()
failpoint.InjectCall("changeRunSubtaskError", e, &subtaskErr)
failpoint.InjectCall("afterRunSubtask", e, &subtaskErr)
logTask.End2(zap.InfoLevel, subtaskErr)

failpoint.InjectCall("mockTiDBShutdown", e, e.id, e.GetTaskBase())
Expand All @@ -392,11 +392,6 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {
return subtaskErr
}

failpoint.InjectCall("beforeCallOnSubtaskFinished", subtask)
if err := e.stepExec.OnFinished(e.stepCtx, subtask); err != nil {
logger.Info("OnFinished failed", zap.Error(err))
return errors.Trace(err)
}
err := e.finishSubtask(e.stepCtx, subtask)
failpoint.InjectCall("syncAfterSubtaskFinish")
return err
Expand Down
35 changes: 0 additions & 35 deletions pkg/disttask/framework/taskexecutor/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ func TestTaskExecutorRun(t *testing.T) {
e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(true)
} else {
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil)
e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", e.pendingSubtask1.ID, gomock.Any()).Return(nil)
}
}
Expand Down Expand Up @@ -299,7 +298,6 @@ func TestTaskExecutorRun(t *testing.T) {
e.stepExecutor.EXPECT().GetStep().Return(proto.StepOne)
e.taskTable.EXPECT().StartSubtask(gomock.Any(), nextSubtask.ID, "id").Return(nil)
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), nextSubtask).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), nextSubtask).Return(nil)
e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", nextSubtask.ID, gomock.Any()).Return(nil)
// exit
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil)
Expand All @@ -318,7 +316,6 @@ func TestTaskExecutorRun(t *testing.T) {
e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil)
e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil)
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil)
e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", int64(1), gomock.Any()).Return(nil)
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil)
e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil)
Expand All @@ -344,7 +341,6 @@ func TestTaskExecutorRun(t *testing.T) {
}
e.taskTable.EXPECT().StartSubtask(gomock.Any(), subtaskID, "id").Return(nil)
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), theSubtask).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), theSubtask).Return(nil)
e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", subtaskID, gomock.Any()).Return(nil)
}
// exit due to no subtask to run for a while
Expand Down Expand Up @@ -392,7 +388,6 @@ func TestTaskExecutorRun(t *testing.T) {
}
e.taskTable.EXPECT().StartSubtask(gomock.Any(), subtaskID, "id").Return(nil)
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), theSubtask).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), theSubtask).Return(nil)
e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", subtaskID, gomock.Any()).Return(nil)
}
}
Expand All @@ -413,7 +408,6 @@ func TestTaskExecutorRun(t *testing.T) {
e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil)
e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil)
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), e.pendingSubtask1).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), e.pendingSubtask1).Return(nil)
e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", int64(1), gomock.Any()).Return(nil)
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil)
step2Subtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{
Expand All @@ -426,7 +420,6 @@ func TestTaskExecutorRun(t *testing.T) {
e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil)
e.taskTable.EXPECT().StartSubtask(gomock.Any(), step2Subtask.ID, "id").Return(nil)
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), step2Subtask).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), step2Subtask).Return(nil)
e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", step2Subtask.ID, gomock.Any()).Return(nil)
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil)
e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(errors.New("some error 2"))
Expand Down Expand Up @@ -462,7 +455,6 @@ func TestTaskExecutorRun(t *testing.T) {
// first round of the run loop
e.taskExecExt.EXPECT().IsIdempotent(gomock.Any()).Return(true)
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil)
e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", subtaskID, gomock.Any()).Return(nil)
// second round of the run loop
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil)
Expand Down Expand Up @@ -539,32 +531,6 @@ func TestTaskExecutorRun(t *testing.T) {
e.stepExecutor.EXPECT().GetStep().Return(proto.StepOne)
e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil)
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), e.pendingSubtask1).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), e.pendingSubtask1).Return(nil)
e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", e.pendingSubtask1.ID, gomock.Any()).Return(nil)
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil)
e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil)
e.taskExecutor.Run(nil)
require.True(t, e.ctrl.Satisfied())
})

t.Run("OnFinished failed for task, will run again", func(t *testing.T) {
e := newTaskExecutorRunEnv(t)
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil)
e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil)
e.taskExecExt.EXPECT().GetStepExecutor(gomock.Any()).Return(e.stepExecutor, nil)
e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil)
e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil)
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), e.pendingSubtask1).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), e.pendingSubtask1).Return(errors.New("some error"))
// second round of the run loop
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil)
e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil)
e.stepExecutor.EXPECT().GetStep().Return(proto.StepOne)
e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil)
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), e.pendingSubtask1).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), e.pendingSubtask1).Return(nil)
e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", e.pendingSubtask1.ID, gomock.Any()).Return(nil)
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil)
e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil)
Expand Down Expand Up @@ -595,7 +561,6 @@ func TestTaskExecutorRun(t *testing.T) {
e.stepExecutor.EXPECT().Init(gomock.Any()).Return(nil)
e.taskTable.EXPECT().StartSubtask(gomock.Any(), e.pendingSubtask1.ID, "id").Return(nil)
e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil)
e.stepExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil)
e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", e.pendingSubtask1.ID, gomock.Any()).Return(nil)
// loop for 8 times without subtask, and exit
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil).Times(8)
Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/framework/testutil/disttest_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func GetCommonStepExecutor(ctrl *gomock.Controller, step proto.Step, runSubtaskF
executor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(runSubtaskFn).AnyTimes()
executor.EXPECT().GetStep().Return(step).AnyTimes()
executor.EXPECT().Cleanup(gomock.Any()).Return(nil).AnyTimes()
executor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
executor.EXPECT().RealtimeSummary().Return(nil).AnyTimes()
return executor
}
Expand Down
Loading

0 comments on commit 3adc71c

Please sign in to comment.