Skip to content

Commit

Permalink
dxf: handle modifying task concurrency in scheduler (#57673)
Browse files Browse the repository at this point in the history
ref #57497
  • Loading branch information
D3Hunter authored Dec 2, 2024
1 parent 6aac3e8 commit e631ba8
Show file tree
Hide file tree
Showing 14 changed files with 450 additions and 11 deletions.
3 changes: 2 additions & 1 deletion pkg/disttask/framework/integrationtests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ go_test(
"framework_scope_test.go",
"framework_test.go",
"main_test.go",
"modify_test.go",
"resource_control_test.go",
],
flaky = True,
race = "off",
shard_count = 22,
shard_count = 23,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
224 changes: 224 additions & 0 deletions pkg/disttask/framework/integrationtests/modify_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integrationtests

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/stretchr/testify/require"
)

func TestModifyTaskConcurrency(t *testing.T) {
c := testutil.NewTestDXFContext(t, 1, 16, true)
schedulerExt := testutil.GetMockSchedulerExt(c.MockCtrl, testutil.SchedulerInfo{
AllErrorRetryable: true,
StepInfos: []testutil.StepInfo{
{Step: proto.StepOne, SubtaskCnt: 1},
{Step: proto.StepTwo, SubtaskCnt: 1},
},
})
subtaskCh := make(chan struct{})
registerExampleTask(t, c.MockCtrl, schedulerExt, c.TestContext,
func(ctx context.Context, subtask *proto.Subtask) error {
select {
case <-subtaskCh:
case <-ctx.Done():
return ctx.Err()
}
return nil
},
)

t.Run("modify pending task concurrency", func(t *testing.T) {
var once sync.Once
modifySyncCh := make(chan struct{})
var theTask *proto.Task
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeGetSchedulableTasks", func() {
once.Do(func() {
task, err := handle.SubmitTask(c.Ctx, "k1", proto.TaskTypeExample, 3, "", nil)
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
require.NoError(t, c.TaskMgr.ModifyTaskByID(c.Ctx, task.ID, &proto.ModifyParam{
PrevState: proto.TaskStatePending,
Modifications: []proto.Modification{
{Type: proto.ModifyConcurrency, To: 7},
},
}))
theTask = task
gotTask, err := c.TaskMgr.GetTaskBaseByID(c.Ctx, theTask.ID)
require.NoError(t, err)
require.Equal(t, proto.TaskStateModifying, gotTask.State)
require.Equal(t, 3, gotTask.Concurrency)
<-modifySyncCh
})
})
modifySyncCh <- struct{}{}
// finish subtasks
subtaskCh <- struct{}{}
subtaskCh <- struct{}{}
task2Base := testutil.WaitTaskDone(c.Ctx, t, theTask.Key)
require.Equal(t, proto.TaskStateSucceed, task2Base.State)
checkSubtaskConcurrency(t, c, theTask.ID, map[proto.Step]int{
proto.StepOne: 7,
proto.StepTwo: 7,
})
})

t.Run("modify running task concurrency at step two", func(t *testing.T) {
var once sync.Once
modifySyncCh := make(chan struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeRefreshTask", func(task *proto.Task) {
if task.State != proto.TaskStateRunning && task.Step != proto.StepTwo {
return
}
once.Do(func() {
require.NoError(t, c.TaskMgr.ModifyTaskByID(c.Ctx, task.ID, &proto.ModifyParam{
PrevState: proto.TaskStateRunning,
Modifications: []proto.Modification{
{Type: proto.ModifyConcurrency, To: 7},
},
}))
<-modifySyncCh
})
})
task, err := handle.SubmitTask(c.Ctx, "k2", proto.TaskTypeExample, 3, "", nil)
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
// finish StepOne
subtaskCh <- struct{}{}
// wait task move to 'modifying' state
modifySyncCh <- struct{}{}
// wait task move back to 'running' state
require.Eventually(t, func() bool {
gotTask, err2 := c.TaskMgr.GetTaskByID(c.Ctx, task.ID)
require.NoError(t, err2)
return gotTask.State == proto.TaskStateRunning
}, 10*time.Second, 100*time.Millisecond)
// finish StepTwo
subtaskCh <- struct{}{}
task2Base := testutil.WaitTaskDone(c.Ctx, t, task.Key)
require.Equal(t, proto.TaskStateSucceed, task2Base.State)
checkSubtaskConcurrency(t, c, task.ID, map[proto.Step]int{
proto.StepOne: 3,
proto.StepTwo: 7,
})
})

t.Run("modify paused task concurrency", func(t *testing.T) {
var once sync.Once
syncCh := make(chan struct{})
var theTask *proto.Task
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeGetSchedulableTasks", func() {
once.Do(func() {
task, err := handle.SubmitTask(c.Ctx, "k3", proto.TaskTypeExample, 3, "", nil)
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
found, err := c.TaskMgr.PauseTask(c.Ctx, task.Key)
require.NoError(t, err)
require.True(t, found)
theTask = task
<-syncCh
})
})
syncCh <- struct{}{}
taskBase := testutil.WaitTaskDoneOrPaused(c.Ctx, t, theTask.Key)
require.Equal(t, proto.TaskStatePaused, taskBase.State)
require.NoError(t, c.TaskMgr.ModifyTaskByID(c.Ctx, theTask.ID, &proto.ModifyParam{
PrevState: proto.TaskStatePaused,
Modifications: []proto.Modification{
{Type: proto.ModifyConcurrency, To: 7},
},
}))
taskBase = testutil.WaitTaskDoneOrPaused(c.Ctx, t, theTask.Key)
require.Equal(t, proto.TaskStatePaused, taskBase.State)
found, err := c.TaskMgr.ResumeTask(c.Ctx, theTask.Key)
require.NoError(t, err)
require.True(t, found)
// finish subtasks
subtaskCh <- struct{}{}
subtaskCh <- struct{}{}
task2Base := testutil.WaitTaskDone(c.Ctx, t, theTask.Key)
require.Equal(t, proto.TaskStateSucceed, task2Base.State)
checkSubtaskConcurrency(t, c, theTask.ID, map[proto.Step]int{
proto.StepOne: 7,
proto.StepTwo: 7,
})
})

t.Run("modify pending task concurrency, but other owner already done it", func(t *testing.T) {
var once sync.Once
modifySyncCh := make(chan struct{})
var theTask *proto.Task
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeGetSchedulableTasks", func() {
once.Do(func() {
task, err := handle.SubmitTask(c.Ctx, "k4", proto.TaskTypeExample, 3, "", nil)
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
require.NoError(t, c.TaskMgr.ModifyTaskByID(c.Ctx, task.ID, &proto.ModifyParam{
PrevState: proto.TaskStatePending,
Modifications: []proto.Modification{
{Type: proto.ModifyConcurrency, To: 7},
},
}))
theTask = task
gotTask, err := c.TaskMgr.GetTaskBaseByID(c.Ctx, theTask.ID)
require.NoError(t, err)
require.Equal(t, proto.TaskStateModifying, gotTask.State)
require.Equal(t, 3, gotTask.Concurrency)
})
})
var onceForRefresh sync.Once
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/afterRefreshTask",
func(task *proto.Task) {
onceForRefresh.Do(func() {
require.Equal(t, proto.TaskStateModifying, task.State)
taskClone := *task
taskClone.Concurrency = 7
require.NoError(t, c.TaskMgr.ModifiedTask(c.Ctx, &taskClone))
gotTask, err := c.TaskMgr.GetTaskBaseByID(c.Ctx, task.ID)
require.NoError(t, err)
require.Equal(t, proto.TaskStatePending, gotTask.State)
<-modifySyncCh
})
},
)
modifySyncCh <- struct{}{}
// finish subtasks
subtaskCh <- struct{}{}
subtaskCh <- struct{}{}
task2Base := testutil.WaitTaskDone(c.Ctx, t, theTask.Key)
require.Equal(t, proto.TaskStateSucceed, task2Base.State)
checkSubtaskConcurrency(t, c, theTask.ID, map[proto.Step]int{
proto.StepOne: 7,
proto.StepTwo: 7,
})
})
}

func checkSubtaskConcurrency(t *testing.T, c *testutil.TestDXFContext, taskID int64, expectedStepCon map[proto.Step]int) {
for step, con := range expectedStepCon {
subtasks, err := c.TaskMgr.GetSubtasksWithHistory(c.Ctx, taskID, step)
require.NoError(t, err)
require.Len(t, subtasks, 1)
require.Equal(t, con, subtasks[0].Concurrency)
}
}
14 changes: 14 additions & 0 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions pkg/disttask/framework/proto/modify.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package proto

import "fmt"

// ModificationType is the type of task modification.
type ModificationType string

Expand All @@ -33,8 +35,18 @@ type ModifyParam struct {
Modifications []Modification `json:"modifications"`
}

// String implements fmt.Stringer interface.
func (p *ModifyParam) String() string {
return fmt.Sprintf("{prev_state: %s, modifications: %v}", p.PrevState, p.Modifications)
}

// Modification is one modification for task.
type Modification struct {
Type ModificationType `json:"type"`
To int64 `json:"to"`
}

// String implements fmt.Stringer interface.
func (m Modification) String() string {
return fmt.Sprintf("{type: %s, to: %d}", m.Type, m.To)
}
1 change: 1 addition & 0 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ type Task struct {
// changed in below case, and framework will update the task meta in the storage.
// - task switches to next step in Scheduler.OnNextSubtasksBatch
// - on task cleanup, we might do some redaction on the meta.
// - on task 'modifying', params inside the meta can be changed.
Meta []byte
Error error
ModifyParam ModifyParam
Expand Down
6 changes: 5 additions & 1 deletion pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ type TaskManager interface {
RevertedTask(ctx context.Context, taskID int64) error
// PauseTask updated task state to pausing.
PauseTask(ctx context.Context, taskKey string) (bool, error)
// PausedTask updated task state to paused.
// PausedTask updated task state to 'paused'.
PausedTask(ctx context.Context, taskID int64) error
// ResumedTask updated task state from resuming to running.
ResumedTask(ctx context.Context, taskID int64) error
// ModifiedTask tries to update task concurrency and meta, and update state
// back to prev-state, if success, it will also update concurrency of all
// active subtasks.
ModifiedTask(ctx context.Context, task *proto.Task) error
// SucceedTask updates a task to success state.
SucceedTask(ctx context.Context, taskID int64) error
// SwitchTaskStep switches the task to the next step and add subtasks in one
Expand Down
29 changes: 26 additions & 3 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,32 @@ func (s *BaseScheduler) onRunning() error {

// onModifying is called when task is in modifying state.
// the first return value indicates whether the scheduler should be recreated.
func (*BaseScheduler) onModifying() (bool, error) {
// TODO: implement me
panic("implement me")
func (s *BaseScheduler) onModifying() (bool, error) {
task := s.getTaskClone()
s.logger.Info("on modifying state", zap.Stringer("param", &task.ModifyParam))
recreateScheduler := false
for _, m := range task.ModifyParam.Modifications {
if m.Type == proto.ModifyConcurrency {
if task.Concurrency == int(m.To) {
// shouldn't happen normally.
s.logger.Info("task concurrency not changed, skip", zap.Int("concurrency", task.Concurrency))
continue
}
s.logger.Info("modify task concurrency", zap.Int("from", task.Concurrency), zap.Int64("to", m.To))
recreateScheduler = true
task.Concurrency = int(m.To)
} else {
// will implement other modification types later.
s.logger.Warn("unsupported modification type", zap.Stringer("type", m.Type))
}
}
if err := s.taskMgr.ModifiedTask(s.ctx, task); err != nil {
return false, errors.Trace(err)
}
task.State = task.ModifyParam.PrevState
task.ModifyParam = proto.ModifyParam{}
s.task.Store(task)
return recreateScheduler, nil
}

func (s *BaseScheduler) onFinished() {
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (sm *Manager) scheduleTaskLoop() {
continue
}

failpoint.InjectCall("beforeGetSchedulableTasks")
schedulableTasks, err := sm.getSchedulableTasks()
if err != nil {
continue
Expand Down
26 changes: 26 additions & 0 deletions pkg/disttask/framework/scheduler/scheduler_nokit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,30 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
require.Equal(t, *scheduler.getTaskClone(), tmpTask)
require.True(t, ctrl.Satisfied())
})

t.Run("test on modifying", func(t *testing.T) {
taskBefore := schTask
taskBefore.State = proto.TaskStateModifying
taskBefore.ModifyParam = proto.ModifyParam{
PrevState: proto.TaskStateRunning,
Modifications: []proto.Modification{
{Type: proto.ModifyConcurrency, To: 123},
},
}
scheduler.task.Store(&taskBefore)
taskMgr.EXPECT().ModifiedTask(gomock.Any(), gomock.Any()).Return(fmt.Errorf("modify err"))
recreateScheduler, err := scheduler.onModifying()
require.ErrorContains(t, err, "modify err")
require.False(t, recreateScheduler)

taskMgr.EXPECT().ModifiedTask(gomock.Any(), gomock.Any()).Return(nil)
recreateScheduler, err = scheduler.onModifying()
require.NoError(t, err)
require.True(t, recreateScheduler)
expectedTask := taskBefore
expectedTask.Concurrency = 123
expectedTask.State = proto.TaskStateRunning
expectedTask.ModifyParam = proto.ModifyParam{}
require.Equal(t, *scheduler.GetTask(), expectedTask)
})
}
Loading

0 comments on commit e631ba8

Please sign in to comment.