Skip to content

Commit

Permalink
disttask: init node meta inside domain (#49996)
Browse files Browse the repository at this point in the history
close #49990
  • Loading branch information
D3Hunter authored Jan 3, 2024
1 parent 3831054 commit 49dfd93
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 36 deletions.
1 change: 0 additions & 1 deletion pkg/disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go_test(
":handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/testkit",
"//pkg/util/backoff",
"@com_github_ngaut_pools//:pools",
Expand Down
3 changes: 0 additions & 3 deletions pkg/disttask/framework/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/stretchr/testify/require"
Expand All @@ -53,8 +52,6 @@ func TestHandle(t *testing.T) {
mgr := storage.NewTaskManager(pool)
storage.SetTaskManager(mgr)

testutil.WaitNodeRegistered(ctx, t)

// no scheduler registered
task, err := handle.SubmitTask(ctx, "1", proto.TaskTypeExample, 2, proto.EmptyMeta)
require.NoError(t, err)
Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/framework/planner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ go_test(
":planner",
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/kv",
"//pkg/testkit",
"@com_github_ngaut_pools//:pools",
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/framework/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/planner"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
Expand All @@ -46,7 +45,6 @@ func TestPlanner(t *testing.T) {
defer pool.Close()
mgr := storage.NewTaskManager(pool)
storage.SetTaskManager(mgr)
testutil.WaitNodeRegistered(ctx, t)
p := &planner.Planner{}
pCtx := planner.PlanCtx{
Ctx: ctx,
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/framework/scheduler/scheduler_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
Expand All @@ -46,7 +45,6 @@ func TestCleanUpRoutine(t *testing.T) {
mockCleanupRoutine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
sch.Start()
defer sch.Stop()
testutil.WaitNodeRegistered(ctx, t)
taskID, err := mgr.CreateTask(ctx, "test", proto.TaskTypeExample, 1, nil)
require.NoError(t, err)

Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,6 @@ func TestTaskFailInManager(t *testing.T) {
schManager.Start()
defer schManager.Stop()

testutil.WaitNodeRegistered(ctx, t)

// unknown task type
taskID, err := mgr.CreateTask(ctx, "test", "test-type", 1, nil)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ go_test(
],
embed = [":taskexecutor"],
flaky = True,
shard_count = 11,
shard_count = 12,
deps = [
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/mock/execute",
Expand Down
10 changes: 6 additions & 4 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,15 @@ func (m *Manager) initMeta() (err error) {
return err
}

// InitMeta initializes the meta of the Manager.
// not a must-success step before start manager, manager will try to init meta periodically.
func (m *Manager) InitMeta() error {
return m.initMeta()
}

// Start starts the Manager.
func (m *Manager) Start() error {
logutil.Logger(m.logCtx).Debug("manager start")
if err := m.initMeta(); err != nil {
return err
}

m.wg.Run(m.fetchAndHandleRunnableTasksLoop)
m.wg.Run(m.fetchAndFastCancelTasksLoop)
m.wg.Run(m.recoverMetaLoop)
Expand Down
38 changes: 38 additions & 0 deletions pkg/disttask/framework/taskexecutor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func TestManager(t *testing.T) {
wg.Wait()
})

require.NoError(t, m.InitMeta())
require.NoError(t, m.Start())
time.Sleep(5 * time.Second)
m.Stop()
Expand Down Expand Up @@ -463,3 +464,40 @@ func TestSlotManagerInManager(t *testing.T) {
require.Equal(t, 0, len(m.slotManager.executorTasks))
require.True(t, ctrl.Satisfied())
}

func TestManagerInitMeta(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockTaskTable := mock.NewMockTaskTable(ctrl)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m := &Manager{
taskTable: mockTaskTable,
ctx: ctx,
logCtx: ctx,
}
mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
require.NoError(t, m.InitMeta())
require.True(t, ctrl.Satisfied())
gomock.InOrder(
mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err")),
mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil),
)
require.NoError(t, m.InitMeta())
require.True(t, ctrl.Satisfied())

bak := retrySQLTimes
t.Cleanup(func() {
retrySQLTimes = bak
})
retrySQLTimes = 1
mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err"))
require.ErrorContains(t, m.InitMeta(), "mock err")
require.True(t, ctrl.Satisfied())

cancel()
mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err"))
require.ErrorIs(t, m.InitMeta(), context.Canceled)
require.True(t, ctrl.Satisfied())
}
15 changes: 0 additions & 15 deletions pkg/disttask/framework/testutil/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -54,7 +52,6 @@ func InitTestContext(t *testing.T, nodeNum int) (context.Context, *gomock.Contro
})

executionContext := testkit.NewDistExecutionContext(t, nodeNum)
WaitNodeRegistered(ctx, t)
testCtx := &TestContext{
subtasksHasRun: make(map[string]map[int64]struct{}),
}
Expand Down Expand Up @@ -86,15 +83,3 @@ func (c *TestContext) CollectedSubtaskCnt(taskID int64, step proto.Step) int {
func getTaskStepKey(id int64, step proto.Step) string {
return fmt.Sprintf("%d/%d", id, step)
}

// WaitNodeRegistered waits until some node is registered.
func WaitNodeRegistered(ctx context.Context, t *testing.T) {
// wait until some node is registered.
require.Eventually(t, func() bool {
taskMgr, err := storage.GetTaskManager()
require.NoError(t, err)
nodes, err := taskMgr.GetAllNodes(ctx)
require.NoError(t, err)
return len(nodes) > 0
}, 5*time.Second, 100*time.Millisecond)
}
15 changes: 10 additions & 5 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,11 @@ func (do *Domain) InitDistTaskLoop() error {
}

storage.SetTaskManager(taskManager)
if err = executorManager.InitMeta(); err != nil {
// executor manager loop will try to recover meta repeatedly, so we can
// just log the error here.
logutil.BgLogger().Warn("init task executor manager meta failed", zap.Error(err))
}
do.wg.Run(func() {
defer func() {
storage.SetTaskManager(nil)
Expand All @@ -1510,7 +1515,7 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag
}()

var schedulerManager *scheduler.Manager
startDispatchIfNeeded := func() {
startSchedulerMgrIfNeeded := func() {
if schedulerManager != nil && schedulerManager.Initialized() {
return
}
Expand All @@ -1522,7 +1527,7 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag
}
schedulerManager.Start()
}
stopDispatchIfNeeded := func() {
stopSchedulerMgrIfNeeded := func() {
if schedulerManager != nil && schedulerManager.Initialized() {
logutil.BgLogger().Info("stopping dist task scheduler manager because the current node is not DDL owner anymore", zap.String("id", do.ddl.GetID()))
schedulerManager.Stop()
Expand All @@ -1534,13 +1539,13 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag
for {
select {
case <-do.exit:
stopDispatchIfNeeded()
stopSchedulerMgrIfNeeded()
return
case <-ticker.C:
if do.ddl.OwnerManager().IsOwner() {
startDispatchIfNeeded()
startSchedulerMgrIfNeeded()
} else {
stopDispatchIfNeeded()
stopSchedulerMgrIfNeeded()
}
}
}
Expand Down

0 comments on commit 49dfd93

Please sign in to comment.