Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: init node meta inside domain #49996

Merged
merged 2 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go_test(
":handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/testkit",
"//pkg/util/backoff",
"@com_github_ngaut_pools//:pools",
Expand Down
3 changes: 0 additions & 3 deletions pkg/disttask/framework/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/stretchr/testify/require"
Expand All @@ -53,8 +52,6 @@ func TestHandle(t *testing.T) {
mgr := storage.NewTaskManager(pool)
storage.SetTaskManager(mgr)

testutil.WaitNodeRegistered(ctx, t)

// no scheduler registered
task, err := handle.SubmitTask(ctx, "1", proto.TaskTypeExample, 2, proto.EmptyMeta)
require.NoError(t, err)
Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/framework/planner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ go_test(
":planner",
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/kv",
"//pkg/testkit",
"@com_github_ngaut_pools//:pools",
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/framework/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/planner"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
Expand All @@ -46,7 +45,6 @@ func TestPlanner(t *testing.T) {
defer pool.Close()
mgr := storage.NewTaskManager(pool)
storage.SetTaskManager(mgr)
testutil.WaitNodeRegistered(ctx, t)
p := &planner.Planner{}
pCtx := planner.PlanCtx{
Ctx: ctx,
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/framework/scheduler/scheduler_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
Expand All @@ -46,7 +45,6 @@ func TestCleanUpRoutine(t *testing.T) {
mockCleanupRoutine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
sch.Start()
defer sch.Stop()
testutil.WaitNodeRegistered(ctx, t)
taskID, err := mgr.CreateTask(ctx, "test", proto.TaskTypeExample, 1, nil)
require.NoError(t, err)

Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,6 @@ func TestTaskFailInManager(t *testing.T) {
schManager.Start()
defer schManager.Stop()

testutil.WaitNodeRegistered(ctx, t)

// unknown task type
taskID, err := mgr.CreateTask(ctx, "test", "test-type", 1, nil)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ go_test(
],
embed = [":taskexecutor"],
flaky = True,
shard_count = 11,
shard_count = 12,
deps = [
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/mock/execute",
Expand Down
10 changes: 6 additions & 4 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,15 @@ func (m *Manager) initMeta() (err error) {
return err
}

// InitMeta initializes the meta of the Manager.
// not a must-success step before start manager, manager will try to init meta periodically.
func (m *Manager) InitMeta() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just rename initMeta to InitMeta?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's named Init in the begin, but it's misleading to Start even Init failed, so renamed to InitMeta

will leave it for now

return m.initMeta()
}

// Start starts the Manager.
func (m *Manager) Start() error {
logutil.Logger(m.logCtx).Debug("manager start")
if err := m.initMeta(); err != nil {
return err
}

m.wg.Run(m.fetchAndHandleRunnableTasksLoop)
m.wg.Run(m.fetchAndFastCancelTasksLoop)
m.wg.Run(m.recoverMetaLoop)
Expand Down
38 changes: 38 additions & 0 deletions pkg/disttask/framework/taskexecutor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func TestManager(t *testing.T) {
wg.Wait()
})

require.NoError(t, m.InitMeta())
require.NoError(t, m.Start())
time.Sleep(5 * time.Second)
m.Stop()
Expand Down Expand Up @@ -463,3 +464,40 @@ func TestSlotManagerInManager(t *testing.T) {
require.Equal(t, 0, len(m.slotManager.executorTasks))
require.True(t, ctrl.Satisfied())
}

func TestManagerInitMeta(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockTaskTable := mock.NewMockTaskTable(ctrl)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m := &Manager{
taskTable: mockTaskTable,
ctx: ctx,
logCtx: ctx,
}
mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
require.NoError(t, m.InitMeta())
require.True(t, ctrl.Satisfied())
gomock.InOrder(
mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err")),
mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil),
)
require.NoError(t, m.InitMeta())
require.True(t, ctrl.Satisfied())

bak := retrySQLTimes
t.Cleanup(func() {
retrySQLTimes = bak
})
retrySQLTimes = 1
mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err"))
require.ErrorContains(t, m.InitMeta(), "mock err")
require.True(t, ctrl.Satisfied())

cancel()
mockTaskTable.EXPECT().StartManager(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err"))
require.ErrorIs(t, m.InitMeta(), context.Canceled)
require.True(t, ctrl.Satisfied())
}
15 changes: 0 additions & 15 deletions pkg/disttask/framework/testutil/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -54,7 +52,6 @@ func InitTestContext(t *testing.T, nodeNum int) (context.Context, *gomock.Contro
})

executionContext := testkit.NewDistExecutionContext(t, nodeNum)
WaitNodeRegistered(ctx, t)
testCtx := &TestContext{
subtasksHasRun: make(map[string]map[int64]struct{}),
}
Expand Down Expand Up @@ -86,15 +83,3 @@ func (c *TestContext) CollectedSubtaskCnt(taskID int64, step proto.Step) int {
func getTaskStepKey(id int64, step proto.Step) string {
return fmt.Sprintf("%d/%d", id, step)
}

// WaitNodeRegistered waits until some node is registered.
func WaitNodeRegistered(ctx context.Context, t *testing.T) {
// wait until some node is registered.
require.Eventually(t, func() bool {
taskMgr, err := storage.GetTaskManager()
require.NoError(t, err)
nodes, err := taskMgr.GetAllNodes(ctx)
require.NoError(t, err)
return len(nodes) > 0
}, 5*time.Second, 100*time.Millisecond)
}
15 changes: 10 additions & 5 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,11 @@ func (do *Domain) InitDistTaskLoop() error {
}

storage.SetTaskManager(taskManager)
if err = executorManager.InitMeta(); err != nil {
// executor manager loop will try to recover meta repeatedly, so we can
// just log the error here.
logutil.BgLogger().Warn("init task executor manager meta failed", zap.Error(err))
}
do.wg.Run(func() {
defer func() {
storage.SetTaskManager(nil)
Expand All @@ -1510,7 +1515,7 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag
}()

var schedulerManager *scheduler.Manager
startDispatchIfNeeded := func() {
startSchedulerMgrIfNeeded := func() {
if schedulerManager != nil && schedulerManager.Initialized() {
return
}
Expand All @@ -1522,7 +1527,7 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag
}
schedulerManager.Start()
}
stopDispatchIfNeeded := func() {
stopSchedulerMgrIfNeeded := func() {
if schedulerManager != nil && schedulerManager.Initialized() {
logutil.BgLogger().Info("stopping dist task scheduler manager because the current node is not DDL owner anymore", zap.String("id", do.ddl.GetID()))
schedulerManager.Stop()
Expand All @@ -1534,13 +1539,13 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag
for {
select {
case <-do.exit:
stopDispatchIfNeeded()
stopSchedulerMgrIfNeeded()
return
case <-ticker.C:
if do.ddl.OwnerManager().IsOwner() {
startDispatchIfNeeded()
startSchedulerMgrIfNeeded()
} else {
stopDispatchIfNeeded()
stopSchedulerMgrIfNeeded()
}
}
}
Expand Down