Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dxf: reduce indent of scheduleTask and some log refactor #57501

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 109 additions & 100 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ type BaseScheduler struct {

// NewBaseScheduler creates a new BaseScheduler.
func NewBaseScheduler(ctx context.Context, task *proto.Task, param Param) *BaseScheduler {
logger := log.L().With(zap.Int64("task-id", task.ID), zap.Stringer("task-type", task.Type), zap.Bool("allocated-slots", param.allocatedSlots))
logger := log.L().With(zap.Int64("task-id", task.ID),
zap.Stringer("task-type", task.Type),
zap.Bool("allocated-slots", param.allocatedSlots))
if intest.InTest {
logger = logger.With(zap.String("server-id", param.serverID))
}
Expand Down Expand Up @@ -164,121 +166,123 @@ func (s *BaseScheduler) scheduleTask() {
s.logger.Info("schedule task exits")
return
case <-ticker.C:
err := s.refreshTaskIfNeeded()
if err != nil {
if errors.Cause(err) == storage.ErrTaskNotFound {
// this can happen when task is reverted/succeed, but before
// we reach here, cleanup routine move it to history.
s.logger.Debug("task not found, might be reverted/succeed/failed", zap.Int64("task_id", s.GetTask().ID),
zap.String("task_key", s.GetTask().Key))
return
}
s.logger.Error("refresh task failed", zap.Error(err))
continue
}

err := s.refreshTaskIfNeeded()
Comment on lines +169 to +171
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just move code out of the ticker branch of select

if err != nil {
if errors.Cause(err) == storage.ErrTaskNotFound {
// this can happen when task is reverted/succeed, but before
// we reach here, cleanup routine move it to history.
s.logger.Debug("task not found, might be reverted/succeed/failed")
return
}
task := *s.GetTask()
// TODO: refine failpoints below.
failpoint.Inject("exitScheduler", func() {
failpoint.Return()
})
failpoint.Inject("cancelTaskAfterRefreshTask", func(val failpoint.Value) {
if val.(bool) && task.State == proto.TaskStateRunning {
err := s.taskMgr.CancelTask(s.ctx, task.ID)
if err != nil {
s.logger.Error("cancel task failed", zap.Error(err))
}
}
})

failpoint.Inject("pausePendingTask", func(val failpoint.Value) {
if val.(bool) && task.State == proto.TaskStatePending {
_, err := s.taskMgr.PauseTask(s.ctx, task.Key)
if err != nil {
s.logger.Error("pause task failed", zap.Error(err))
}
task.State = proto.TaskStatePausing
s.task.Store(&task)
}
})

failpoint.Inject("pauseTaskAfterRefreshTask", func(val failpoint.Value) {
if val.(bool) && task.State == proto.TaskStateRunning {
_, err := s.taskMgr.PauseTask(s.ctx, task.Key)
if err != nil {
s.logger.Error("pause task failed", zap.Error(err))
}
task.State = proto.TaskStatePausing
s.task.Store(&task)
}
})

switch task.State {
case proto.TaskStateCancelling:
err = s.onCancelling()
case proto.TaskStatePausing:
err = s.onPausing()
case proto.TaskStatePaused:
err = s.onPaused()
// close the scheduler.
if err == nil {
return
s.logger.Error("refresh task failed", zap.Error(err))
continue
}
task := *s.GetTask()
// TODO: refine failpoints below.
failpoint.Inject("exitScheduler", func() {
failpoint.Return()
})
failpoint.Inject("cancelTaskAfterRefreshTask", func(val failpoint.Value) {
if val.(bool) && task.State == proto.TaskStateRunning {
err := s.taskMgr.CancelTask(s.ctx, task.ID)
if err != nil {
s.logger.Error("cancel task failed", zap.Error(err))
}
case proto.TaskStateResuming:
// Case with 2 nodes.
// Here is the timeline
// 1. task in pausing state.
// 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots.
// 3. node1's scheduler transfer the node from pausing to paused state.
// 4. resume the task.
// 5. node2 scheduler call refreshTask and get task with resuming state.
if !s.allocatedSlots {
s.logger.Info("scheduler exit since not allocated slots", zap.Stringer("state", task.State))
return
}
})

failpoint.Inject("pausePendingTask", func(val failpoint.Value) {
if val.(bool) && task.State == proto.TaskStatePending {
_, err := s.taskMgr.PauseTask(s.ctx, task.Key)
if err != nil {
s.logger.Error("pause task failed", zap.Error(err))
}
err = s.onResuming()
case proto.TaskStateReverting:
err = s.onReverting()
case proto.TaskStatePending:
err = s.onPending()
case proto.TaskStateRunning:
// Case with 2 nodes.
// Here is the timeline
// 1. task in pausing state.
// 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots.
// 3. node1's scheduler transfer the node from pausing to paused state.
// 4. resume the task.
// 5. node1 start another scheduler and transfer the node from resuming to running state.
// 6. node2 scheduler call refreshTask and get task with running state.
if !s.allocatedSlots {
s.logger.Info("scheduler exit since not allocated slots", zap.Stringer("state", task.State))
return
task.State = proto.TaskStatePausing
s.task.Store(&task)
}
})

failpoint.Inject("pauseTaskAfterRefreshTask", func(val failpoint.Value) {
if val.(bool) && task.State == proto.TaskStateRunning {
_, err := s.taskMgr.PauseTask(s.ctx, task.Key)
if err != nil {
s.logger.Error("pause task failed", zap.Error(err))
}
err = s.onRunning()
case proto.TaskStateSucceed, proto.TaskStateReverted, proto.TaskStateFailed:
s.onFinished()
task.State = proto.TaskStatePausing
s.task.Store(&task)
}
})

switch task.State {
case proto.TaskStateCancelling:
err = s.onCancelling()
case proto.TaskStatePausing:
err = s.onPausing()
case proto.TaskStatePaused:
err = s.onPaused()
// close the scheduler.
if err == nil {
return
}
if err != nil {
s.logger.Info("schedule task meet err, reschedule it", zap.Error(err))
case proto.TaskStateResuming:
// Case with 2 nodes.
// Here is the timeline
// 1. task in pausing state.
// 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots.
// 3. node1's scheduler transfer the node from pausing to paused state.
// 4. resume the task.
// 5. node2 scheduler call refreshTask and get task with resuming state.
if !s.allocatedSlots {
s.logger.Info("scheduler exit since not allocated slots", zap.Stringer("state", task.State))
return
}

failpoint.InjectCall("mockOwnerChange")
err = s.onResuming()
case proto.TaskStateReverting:
err = s.onReverting()
case proto.TaskStatePending:
err = s.onPending()
case proto.TaskStateRunning:
// Case with 2 nodes.
// Here is the timeline
// 1. task in pausing state.
// 2. node1 and node2 start schedulers with task in pausing state without allocatedSlots.
// 3. node1's scheduler transfer the node from pausing to paused state.
// 4. resume the task.
// 5. node1 start another scheduler and transfer the node from resuming to running state.
// 6. node2 scheduler call refreshTask and get task with running state.
if !s.allocatedSlots {
s.logger.Info("scheduler exit since not allocated slots", zap.Stringer("state", task.State))
return
}
err = s.onRunning()
case proto.TaskStateSucceed, proto.TaskStateReverted, proto.TaskStateFailed:
s.onFinished()
return
}
if err != nil {
s.logger.Info("schedule task meet err, reschedule it", zap.Error(err))
}

failpoint.InjectCall("mockOwnerChange")
}
}

// handle task in cancelling state, schedule revert subtasks.
func (s *BaseScheduler) onCancelling() error {
task := s.GetTask()
s.logger.Info("on cancelling state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step)))
s.logger.Info("on cancelling state", zap.Stringer("state", task.State),
zap.String("step", proto.Step2Str(task.Type, task.Step)))

return s.revertTask(errors.New(taskCancelMsg))
}

// handle task in pausing state, cancel all running subtasks.
func (s *BaseScheduler) onPausing() error {
task := *s.GetTask()
s.logger.Info("on pausing state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step)))
s.logger.Info("on pausing state", zap.Stringer("state", task.State),
zap.String("step", proto.Step2Str(task.Type, task.Step)))
cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step)
if err != nil {
s.logger.Warn("check task failed", zap.Error(err))
Expand All @@ -302,15 +306,17 @@ func (s *BaseScheduler) onPausing() error {
// handle task in paused state.
func (s *BaseScheduler) onPaused() error {
task := s.GetTask()
s.logger.Info("on paused state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step)))
s.logger.Info("on paused state", zap.Stringer("state", task.State),
zap.String("step", proto.Step2Str(task.Type, task.Step)))
failpoint.InjectCall("mockDMLExecutionOnPausedState")
return nil
}

// handle task in resuming state.
func (s *BaseScheduler) onResuming() error {
task := *s.GetTask()
s.logger.Info("on resuming state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step)))
s.logger.Info("on resuming state", zap.Stringer("state", task.State),
zap.String("step", proto.Step2Str(task.Type, task.Step)))
cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step)
if err != nil {
s.logger.Warn("check task failed", zap.Error(err))
Expand All @@ -333,7 +339,8 @@ func (s *BaseScheduler) onResuming() error {
// handle task in reverting state, check all revert subtasks finishes.
func (s *BaseScheduler) onReverting() error {
task := *s.GetTask()
s.logger.Debug("on reverting state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step)))
s.logger.Debug("on reverting state", zap.Stringer("state", task.State),
zap.String("step", proto.Step2Str(task.Type, task.Step)))
cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step)
if err != nil {
s.logger.Warn("check task failed", zap.Error(err))
Expand All @@ -360,7 +367,8 @@ func (s *BaseScheduler) onReverting() error {
// handle task in pending state, schedule subtasks.
func (s *BaseScheduler) onPending() error {
task := s.GetTask()
s.logger.Debug("on pending state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step)))
s.logger.Debug("on pending state", zap.Stringer("state", task.State),
zap.String("step", proto.Step2Str(task.Type, task.Step)))
return s.switch2NextStep()
}

Expand Down Expand Up @@ -577,7 +585,8 @@ func generateTaskExecutorNodes(ctx context.Context) (serverNodes []*infosync.Ser
func (s *BaseScheduler) GetPreviousSubtaskMetas(taskID int64, step proto.Step) ([][]byte, error) {
previousSubtasks, err := s.taskMgr.GetAllSubtasksByStepAndState(s.ctx, taskID, step, proto.SubtaskStateSucceed)
if err != nil {
s.logger.Warn("get previous succeed subtask failed", zap.String("step", proto.Step2Str(s.GetTask().Type, step)))
s.logger.Warn("get previous succeed subtask failed",
zap.String("step", proto.Step2Str(s.GetTask().Type, step)))
return nil, err
}
previousSubtaskMetas := make([][]byte, 0, len(previousSubtasks))
Expand Down
6 changes: 4 additions & 2 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ func (m *Manager) startTaskExecutor(taskBase *proto.TaskBase) {
zap.Stringer("type", task.Type), zap.Int("remaining-slots", m.slotManager.availableSlots()))
m.executorWG.RunWithLog(func() {
defer func() {
m.logger.Info("task executor exit", zap.Int64("task-id", task.ID), zap.Stringer("type", task.Type))
m.logger.Info("task executor exit", zap.Int64("task-id", task.ID),
zap.Stringer("type", task.Type))
m.slotManager.free(task.ID)
m.delTaskExecutor(executor)
executor.Close()
Expand Down Expand Up @@ -381,7 +382,8 @@ func (m *Manager) failSubtask(err error, taskID int64, taskExecutor TaskExecutor
return m.taskTable.FailSubtask(m.ctx, m.id, taskID, err)
}, "update to subtask failed")
if err1 == nil {
m.logger.Error("update error to subtask success", zap.Int64("task-id", taskID), zap.Error(err1), zap.Stack("stack"))
m.logger.Error("update error to subtask success", zap.Int64("task-id", taskID),
zap.Error(err1), zap.Stack("stack"))
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/disttask/framework/taskexecutor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,8 @@ func (e *BaseTaskExecutor) onError(err error) {
}

if errors.HasStack(err) {
e.logger.Error("onError", zap.Error(err), zap.Stack("stack"), zap.String("error stack", fmt.Sprintf("%+v", err)))
e.logger.Error("onError", zap.Error(err), zap.Stack("stack"),
zap.String("error stack", fmt.Sprintf("%+v", err)))
} else {
e.logger.Error("onError", zap.Error(err), zap.Stack("stack"))
}
Expand Down