Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: persist subtask error properly #46674

Merged
merged 3 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ type TaskTable interface {
GetGlobalTaskByID(taskID int64) (task *proto.Task, err error)

GetSubtaskInStates(instanceID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error)
StartSubtask(id int64) error
UpdateSubtaskStateAndError(id int64, state string, err error) error
FinishSubtask(id int64, meta []byte) error
xhebox marked this conversation as resolved.
Show resolved Hide resolved
StartSubtask(subtaskID int64) error
UpdateSubtaskStateAndError(subtaskID int64, state string, err error) error
FinishSubtask(subtaskID int64, meta []byte) error
HasSubtasksInStates(instanceID string, taskID int64, step int64, states ...interface{}) (bool, error)
UpdateErrorToSubtask(tidbID string, err error) error
IsSchedulerCanceled(taskID int64, execID string) (bool, error)
UpdateErrorToSubtask(instanceID string, taskID int64, err error) error
IsSchedulerCanceled(taskID int64, instanceID string) (bool, error)
}

// Pool defines the interface of a pool.
Expand Down
6 changes: 3 additions & 3 deletions disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *InternalSchedulerImpl) Run(ctx context.Context, task *proto.Task) error
if s.mu.handled {
return err
}
return s.taskTable.UpdateErrorToSubtask(s.id, err)
return s.taskTable.UpdateErrorToSubtask(s.id, task.ID, err)
}

func (s *InternalSchedulerImpl) run(ctx context.Context, task *proto.Task) error {
Expand Down Expand Up @@ -477,8 +477,8 @@ func (s *InternalSchedulerImpl) startSubtask(id int64) {
}
}

func (s *InternalSchedulerImpl) updateSubtaskStateAndError(id int64, state string, subTaskErr error) {
err := s.taskTable.UpdateSubtaskStateAndError(id, state, subTaskErr)
func (s *InternalSchedulerImpl) updateSubtaskStateAndError(subtaskID int64, state string, subTaskErr error) {
err := s.taskTable.UpdateSubtaskStateAndError(subtaskID, state, subTaskErr)
if err != nil {
s.onError(err)
}
Expand Down
4 changes: 2 additions & 2 deletions disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ func TestSchedulerRun(t *testing.T) {
// UpdateErrorToSubtask won't return such errors, but since the error is not handled,
// it's saved by UpdateErrorToSubtask.
// here we use this to check the returned error of s.run.
forwardErrFn := func(_ string, err error) error {
forwardErrFn := func(_ string, _ int64, err error) error {
return err
}
mockSubtaskTable.EXPECT().UpdateErrorToSubtask(gomock.Any(), gomock.Any()).DoAndReturn(forwardErrFn).AnyTimes()
mockSubtaskTable.EXPECT().UpdateErrorToSubtask(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(forwardErrFn).AnyTimes()
err := scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp})
require.EqualError(t, err, schedulerRegisterErr.Error())

Expand Down
2 changes: 1 addition & 1 deletion disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestSubTaskTable(t *testing.T) {
// test UpdateErrorToSubtask do update start/update time
err = sm.AddNewSubTask(3, proto.StepInit, "for_test", []byte("test"), proto.TaskTypeExample, false)
require.NoError(t, err)
require.NoError(t, sm.UpdateErrorToSubtask("for_test", errors.New("fail")))
require.NoError(t, sm.UpdateErrorToSubtask("for_test", 3, errors.New("fail")))
subtask, err = sm.GetSubtaskInStates("for_test", 3, proto.StepInit, proto.TaskStateFailed)
require.NoError(t, err)
require.Equal(t, proto.TaskStateFailed, subtask.State)
Expand Down
10 changes: 5 additions & 5 deletions disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,14 @@ func (stm *TaskManager) GetSubtaskInStates(tidbID string, taskID int64, step int
}

// UpdateErrorToSubtask updates the error to subtask.
func (stm *TaskManager) UpdateErrorToSubtask(tidbID string, err error) error {
func (stm *TaskManager) UpdateErrorToSubtask(tidbID string, taskID int64, err error) error {
if err == nil {
return nil
}
_, err1 := stm.executeSQLWithNewSession(stm.ctx, `update mysql.tidb_background_subtask
set state = %?, error = %?, start_time = unix_timestamp(), state_update_time = unix_timestamp()
where exec_id = %? and state = %? limit 1;`,
proto.TaskStateFailed, serializeErr(err), tidbID, proto.TaskStatePending)
where exec_id = %? and task_key = %? and state = %? limit 1;`,
proto.TaskStateFailed, serializeErr(err), tidbID, taskID, proto.TaskStatePending)
return err1
}

Expand Down Expand Up @@ -469,11 +469,11 @@ func (stm *TaskManager) HasSubtasksInStates(tidbID string, taskID int64, step in
}

// StartSubtask updates the subtask state to running.
func (stm *TaskManager) StartSubtask(id int64) error {
func (stm *TaskManager) StartSubtask(subtaskID int64) error {
_, err := stm.executeSQLWithNewSession(stm.ctx, `update mysql.tidb_background_subtask
set state = %?, start_time = unix_timestamp(), state_update_time = unix_timestamp()
where id = %?`,
proto.TaskStateRunning, id)
proto.TaskStateRunning, subtaskID)
return err
}

Expand Down