From 308f1bb4f3d16a1fc4019c0b6b9b78c2de4bb9f7 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 19 Nov 2024 14:38:14 +0800 Subject: [PATCH 1/2] change --- pkg/disttask/framework/scheduler/scheduler.go | 209 +++++++++--------- .../framework/scheduler/scheduler_manager.go | 2 +- .../framework/taskexecutor/manager.go | 6 +- .../framework/taskexecutor/task_executor.go | 3 +- 4 files changed, 116 insertions(+), 104 deletions(-) diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index 43a77c85cd52d..9d2228b96292e 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -91,7 +91,9 @@ type BaseScheduler struct { // NewBaseScheduler creates a new BaseScheduler. func NewBaseScheduler(ctx context.Context, task *proto.Task, param Param) *BaseScheduler { - logger := log.L().With(zap.Int64("task-id", task.ID), zap.Stringer("task-type", task.Type), zap.Bool("allocated-slots", param.allocatedSlots)) + logger := log.L().With(zap.Int64("task-id", task.ID), + zap.Stringer("task-type", task.Type), + zap.Bool("allocated-slots", param.allocatedSlots)) if intest.InTest { logger = logger.With(zap.String("server-id", param.serverID)) } @@ -164,113 +166,114 @@ func (s *BaseScheduler) scheduleTask() { s.logger.Info("schedule task exits") return case <-ticker.C: - err := s.refreshTaskIfNeeded() - if err != nil { - if errors.Cause(err) == storage.ErrTaskNotFound { - // this can happen when task is reverted/succeed, but before - // we reach here, cleanup routine move it to history. - s.logger.Debug("task not found, might be reverted/succeed/failed", zap.Int64("task_id", s.GetTask().ID), - zap.String("task_key", s.GetTask().Key)) - return - } - s.logger.Error("refresh task failed", zap.Error(err)) - continue + } + + err := s.refreshTaskIfNeeded() + if err != nil { + if errors.Cause(err) == storage.ErrTaskNotFound { + // this can happen when task is reverted/succeed, but before + // we reach here, cleanup routine move it to history. + s.logger.Debug("task not found, might be reverted/succeed/failed") + return } - task := *s.GetTask() - // TODO: refine failpoints below. - failpoint.Inject("exitScheduler", func() { - failpoint.Return() - }) - failpoint.Inject("cancelTaskAfterRefreshTask", func(val failpoint.Value) { - if val.(bool) && task.State == proto.TaskStateRunning { - err := s.taskMgr.CancelTask(s.ctx, task.ID) - if err != nil { - s.logger.Error("cancel task failed", zap.Error(err)) - } - } - }) - - failpoint.Inject("pausePendingTask", func(val failpoint.Value) { - if val.(bool) && task.State == proto.TaskStatePending { - _, err := s.taskMgr.PauseTask(s.ctx, task.Key) - if err != nil { - s.logger.Error("pause task failed", zap.Error(err)) - } - task.State = proto.TaskStatePausing - s.task.Store(&task) - } - }) - - failpoint.Inject("pauseTaskAfterRefreshTask", func(val failpoint.Value) { - if val.(bool) && task.State == proto.TaskStateRunning { - _, err := s.taskMgr.PauseTask(s.ctx, task.Key) - if err != nil { - s.logger.Error("pause task failed", zap.Error(err)) - } - task.State = proto.TaskStatePausing - s.task.Store(&task) - } - }) - - switch task.State { - case proto.TaskStateCancelling: - err = s.onCancelling() - case proto.TaskStatePausing: - err = s.onPausing() - case proto.TaskStatePaused: - err = s.onPaused() - // close the scheduler. - if err == nil { - return + s.logger.Error("refresh task failed", zap.Error(err)) + continue + } + task := *s.GetTask() + // TODO: refine failpoints below. + failpoint.Inject("exitScheduler", func() { + failpoint.Return() + }) + failpoint.Inject("cancelTaskAfterRefreshTask", func(val failpoint.Value) { + if val.(bool) && task.State == proto.TaskStateRunning { + err := s.taskMgr.CancelTask(s.ctx, task.ID) + if err != nil { + s.logger.Error("cancel task failed", zap.Error(err)) } - case proto.TaskStateResuming: - // Case with 2 nodes. - // Here is the timeline - // 1. task in pausing state. - // 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots. - // 3. node1's scheduler transfer the node from pausing to paused state. - // 4. resume the task. - // 5. node2 scheduler call refreshTask and get task with resuming state. - if !s.allocatedSlots { - s.logger.Info("scheduler exit since not allocated slots", zap.Stringer("state", task.State)) - return + } + }) + + failpoint.Inject("pausePendingTask", func(val failpoint.Value) { + if val.(bool) && task.State == proto.TaskStatePending { + _, err := s.taskMgr.PauseTask(s.ctx, task.Key) + if err != nil { + s.logger.Error("pause task failed", zap.Error(err)) } - err = s.onResuming() - case proto.TaskStateReverting: - err = s.onReverting() - case proto.TaskStatePending: - err = s.onPending() - case proto.TaskStateRunning: - // Case with 2 nodes. - // Here is the timeline - // 1. task in pausing state. - // 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots. - // 3. node1's scheduler transfer the node from pausing to paused state. - // 4. resume the task. - // 5. node1 start another scheduler and transfer the node from resuming to running state. - // 6. node2 scheduler call refreshTask and get task with running state. - if !s.allocatedSlots { - s.logger.Info("scheduler exit since not allocated slots", zap.Stringer("state", task.State)) - return + task.State = proto.TaskStatePausing + s.task.Store(&task) + } + }) + + failpoint.Inject("pauseTaskAfterRefreshTask", func(val failpoint.Value) { + if val.(bool) && task.State == proto.TaskStateRunning { + _, err := s.taskMgr.PauseTask(s.ctx, task.Key) + if err != nil { + s.logger.Error("pause task failed", zap.Error(err)) } - err = s.onRunning() - case proto.TaskStateSucceed, proto.TaskStateReverted, proto.TaskStateFailed: - s.onFinished() + task.State = proto.TaskStatePausing + s.task.Store(&task) + } + }) + + switch task.State { + case proto.TaskStateCancelling: + err = s.onCancelling() + case proto.TaskStatePausing: + err = s.onPausing() + case proto.TaskStatePaused: + err = s.onPaused() + // close the scheduler. + if err == nil { return } - if err != nil { - s.logger.Info("schedule task meet err, reschedule it", zap.Error(err)) + case proto.TaskStateResuming: + // Case with 2 nodes. + // Here is the timeline + // 1. task in pausing state. + // 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots. + // 3. node1's scheduler transfer the node from pausing to paused state. + // 4. resume the task. + // 5. node2 scheduler call refreshTask and get task with resuming state. + if !s.allocatedSlots { + s.logger.Info("scheduler exit since not allocated slots", zap.Stringer("state", task.State)) + return } - - failpoint.InjectCall("mockOwnerChange") + err = s.onResuming() + case proto.TaskStateReverting: + err = s.onReverting() + case proto.TaskStatePending: + err = s.onPending() + case proto.TaskStateRunning: + // Case with 2 nodes. + // Here is the timeline + // 1. task in pausing state. + // 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots. + // 3. node1's scheduler transfer the node from pausing to paused state. + // 4. resume the task. + // 5. node1 start another scheduler and transfer the node from resuming to running state. + // 6. node2 scheduler call refreshTask and get task with running state. + if !s.allocatedSlots { + s.logger.Info("scheduler exit since not allocated slots", zap.Stringer("state", task.State)) + return + } + err = s.onRunning() + case proto.TaskStateSucceed, proto.TaskStateReverted, proto.TaskStateFailed: + s.onFinished() + return + } + if err != nil { + s.logger.Info("schedule task meet err, reschedule it", zap.Error(err)) } + + failpoint.InjectCall("mockOwnerChange") } } // handle task in cancelling state, schedule revert subtasks. func (s *BaseScheduler) onCancelling() error { task := s.GetTask() - s.logger.Info("on cancelling state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step))) + s.logger.Info("on cancelling state", zap.Stringer("state", task.State), + zap.String("step", proto.Step2Str(task.Type, task.Step))) return s.revertTask(errors.New(taskCancelMsg)) } @@ -278,7 +281,8 @@ func (s *BaseScheduler) onCancelling() error { // handle task in pausing state, cancel all running subtasks. func (s *BaseScheduler) onPausing() error { task := *s.GetTask() - s.logger.Info("on pausing state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step))) + s.logger.Info("on pausing state", zap.Stringer("state", task.State), + zap.String("step", proto.Step2Str(task.Type, task.Step))) cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step) if err != nil { s.logger.Warn("check task failed", zap.Error(err)) @@ -302,7 +306,8 @@ func (s *BaseScheduler) onPausing() error { // handle task in paused state. func (s *BaseScheduler) onPaused() error { task := s.GetTask() - s.logger.Info("on paused state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step))) + s.logger.Info("on paused state", zap.Stringer("state", task.State), + zap.String("step", proto.Step2Str(task.Type, task.Step))) failpoint.InjectCall("mockDMLExecutionOnPausedState") return nil } @@ -310,7 +315,8 @@ func (s *BaseScheduler) onPaused() error { // handle task in resuming state. func (s *BaseScheduler) onResuming() error { task := *s.GetTask() - s.logger.Info("on resuming state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step))) + s.logger.Info("on resuming state", zap.Stringer("state", task.State), + zap.String("step", proto.Step2Str(task.Type, task.Step))) cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step) if err != nil { s.logger.Warn("check task failed", zap.Error(err)) @@ -333,7 +339,8 @@ func (s *BaseScheduler) onResuming() error { // handle task in reverting state, check all revert subtasks finishes. func (s *BaseScheduler) onReverting() error { task := *s.GetTask() - s.logger.Debug("on reverting state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step))) + s.logger.Debug("on reverting state", zap.Stringer("state", task.State), + zap.String("step", proto.Step2Str(task.Type, task.Step))) cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step) if err != nil { s.logger.Warn("check task failed", zap.Error(err)) @@ -360,7 +367,8 @@ func (s *BaseScheduler) onReverting() error { // handle task in pending state, schedule subtasks. func (s *BaseScheduler) onPending() error { task := s.GetTask() - s.logger.Debug("on pending state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step))) + s.logger.Debug("on pending state", zap.Stringer("state", task.State), + zap.String("step", proto.Step2Str(task.Type, task.Step))) return s.switch2NextStep() } @@ -577,7 +585,8 @@ func generateTaskExecutorNodes(ctx context.Context) (serverNodes []*infosync.Ser func (s *BaseScheduler) GetPreviousSubtaskMetas(taskID int64, step proto.Step) ([][]byte, error) { previousSubtasks, err := s.taskMgr.GetAllSubtasksByStepAndState(s.ctx, taskID, step, proto.SubtaskStateSucceed) if err != nil { - s.logger.Warn("get previous succeed subtask failed", zap.String("step", proto.Step2Str(s.GetTask().Type, step))) + s.logger.Warn("get previous succeed subtask failed", + zap.String("step", proto.Step2Str(s.GetTask().Type, step))) return nil, err } previousSubtaskMetas := make([][]byte, 0, len(previousSubtasks)) diff --git a/pkg/disttask/framework/scheduler/scheduler_manager.go b/pkg/disttask/framework/scheduler/scheduler_manager.go index 0b946111a812c..bd99246a5a61b 100644 --- a/pkg/disttask/framework/scheduler/scheduler_manager.go +++ b/pkg/disttask/framework/scheduler/scheduler_manager.go @@ -153,7 +153,7 @@ func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Mana return schedulerManager } -// Start the schedulerManager, start the scheduleTaskLoop to start multiple schedulers. +// Start the schedulerManager, start the scheduleTask to start multiple schedulers. func (sm *Manager) Start() { // init cached managed nodes sm.nodeMgr.refreshNodes(sm.ctx, sm.taskMgr, sm.slotMgr) diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go index 460bcb8d0ec2e..f87e98d871357 100644 --- a/pkg/disttask/framework/taskexecutor/manager.go +++ b/pkg/disttask/framework/taskexecutor/manager.go @@ -329,7 +329,8 @@ func (m *Manager) startTaskExecutor(taskBase *proto.TaskBase) { zap.Stringer("type", task.Type), zap.Int("remaining-slots", m.slotManager.availableSlots())) m.executorWG.RunWithLog(func() { defer func() { - m.logger.Info("task executor exit", zap.Int64("task-id", task.ID), zap.Stringer("type", task.Type)) + m.logger.Info("task executor exit", zap.Int64("task-id", task.ID), + zap.Stringer("type", task.Type)) m.slotManager.free(task.ID) m.delTaskExecutor(executor) executor.Close() @@ -381,7 +382,8 @@ func (m *Manager) failSubtask(err error, taskID int64, taskExecutor TaskExecutor return m.taskTable.FailSubtask(m.ctx, m.id, taskID, err) }, "update to subtask failed") if err1 == nil { - m.logger.Error("update error to subtask success", zap.Int64("task-id", taskID), zap.Error(err1), zap.Stack("stack")) + m.logger.Error("update error to subtask success", zap.Int64("task-id", taskID), + zap.Error(err1), zap.Stack("stack")) } } diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 7d9f86e7cdbc4..8fd2045a7753b 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -535,7 +535,8 @@ func (e *BaseTaskExecutor) onError(err error) { } if errors.HasStack(err) { - e.logger.Error("onError", zap.Error(err), zap.Stack("stack"), zap.String("error stack", fmt.Sprintf("%+v", err))) + e.logger.Error("onError", zap.Error(err), zap.Stack("stack"), + zap.String("error stack", fmt.Sprintf("%+v", err))) } else { e.logger.Error("onError", zap.Error(err), zap.Stack("stack")) } From 1613d0fc8900363e82d089d010328f010dba2587 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 19 Nov 2024 15:27:27 +0800 Subject: [PATCH 2/2] change --- pkg/disttask/framework/scheduler/scheduler_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/disttask/framework/scheduler/scheduler_manager.go b/pkg/disttask/framework/scheduler/scheduler_manager.go index bd99246a5a61b..0b946111a812c 100644 --- a/pkg/disttask/framework/scheduler/scheduler_manager.go +++ b/pkg/disttask/framework/scheduler/scheduler_manager.go @@ -153,7 +153,7 @@ func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Mana return schedulerManager } -// Start the schedulerManager, start the scheduleTask to start multiple schedulers. +// Start the schedulerManager, start the scheduleTaskLoop to start multiple schedulers. func (sm *Manager) Start() { // init cached managed nodes sm.nodeMgr.refreshNodes(sm.ctx, sm.taskMgr, sm.slotMgr)