Skip to content

Commit

Permalink
disttask: start dispatcher/scheduler of distatsk framework when tidb …
Browse files Browse the repository at this point in the history
…start (#42543)

close #42481
  • Loading branch information
lcwangchao authored Mar 29, 2023
1 parent dbfeef9 commit fa08c36
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 0 deletions.
16 changes: 16 additions & 0 deletions disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "framework_test",
timeout = "short",
srcs = ["framework_test.go"],
flaky = True,
deps = [
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
"//disttask/framework/scheduler",
"//disttask/framework/storage",
"//testkit",
"@com_github_stretchr_testify//require",
],
)
1 change: 1 addition & 0 deletions disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//util/syncutil",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//mock",
"@org_golang_x_exp//maps",
"@org_uber_go_zap//:zap",
],
)
Expand Down
8 changes: 8 additions & 0 deletions disttask/framework/dispatcher/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/util/syncutil"
"golang.org/x/exp/maps"
)

// TaskFlowHandle is used to control the process operations for each global task.
Expand All @@ -39,6 +40,13 @@ func RegisterTaskFlowHandle(taskType string, dispatcherHandle TaskFlowHandle) {
taskFlowHandleMap.Unlock()
}

// ClearTaskFlowHandle is only used in test
func ClearTaskFlowHandle() {
taskFlowHandleMap.Lock()
maps.Clear(taskFlowHandleMap.handleMap)
taskFlowHandleMap.Unlock()
}

// GetTaskFlowHandle is used to get the global task handle.
func GetTaskFlowHandle(taskType string) TaskFlowHandle {
taskFlowHandleMap.Lock()
Expand Down
117 changes: 117 additions & 0 deletions disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package framework_test

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/tidb/disttask/framework/dispatcher"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/scheduler"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

type testFlowHandle struct {
}

func (*testFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
if gTask.State == proto.TaskStatePending {
gTask.Step = proto.StepOne
return [][]byte{
[]byte("task1"),
[]byte("task2"),
}, nil
}
return nil, nil
}

func (*testFlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ string) (meta []byte, err error) {
return nil, nil
}

type testMiniTask struct{}

func (testMiniTask) IsMinimalTask() {}

type testScheduler struct{}

func (*testScheduler) InitSubtaskExecEnv(_ context.Context) error { return nil }

func (t *testScheduler) CleanupSubtaskExecEnv(_ context.Context) error { return nil }

func (t *testScheduler) Rollback(_ context.Context) error { return nil }

func (t *testScheduler) SplitSubtask(_ []byte) []proto.MinimalTask {
return []proto.MinimalTask{
testMiniTask{},
testMiniTask{},
testMiniTask{},
}
}

type testSubtaskExecutor struct {
v *atomic.Int64
}

func (e *testSubtaskExecutor) Run(_ context.Context) error {
e.v.Add(1)
return nil
}

func TestFrameworkStartUp(t *testing.T) {
defer dispatcher.ClearTaskFlowHandle()
defer scheduler.ClearSchedulers()

var v atomic.Int64
dispatcher.ClearTaskFlowHandle()
dispatcher.RegisterTaskFlowHandle("type1", &testFlowHandle{})
scheduler.ClearSchedulers()
scheduler.RegisterSchedulerConstructor("type1", func(_ []byte, _ int64) (scheduler.Scheduler, error) {
return &testScheduler{}, nil
})
scheduler.RegisterSubtaskExectorConstructor("type1", func(_ proto.MinimalTask, _ int64) (scheduler.SubtaskExecutor, error) {
return &testSubtaskExecutor{v: &v}, nil
})

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
gm := storage.NewGlobalTaskManager(context.TODO(), tk.Session())
taskID, err := gm.AddNewTask("key1", "type1", 8, nil)
require.NoError(t, err)
start := time.Now()

var task *proto.Task
for {
if time.Since(start) > 2*time.Minute {
require.FailNow(t, "timeout")
}

time.Sleep(time.Second)
task, err = gm.GetTaskByID(taskID)
require.NoError(t, err)
require.NotNil(t, task)
if task.State != proto.TaskStatePending && task.State != proto.TaskStateRunning {
break
}
}

require.Equal(t, proto.TaskStateSucceed, task.State)
require.Equal(t, int64(6), v.Load())
}
8 changes: 8 additions & 0 deletions disttask/framework/scheduler/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,11 @@ func RegisterSubtaskExectorConstructor(taskType string, constructor SubtaskExecu
}
subtaskExecutorOptions[taskType] = option
}

// ClearSchedulers is only used in test
func ClearSchedulers() {
schedulerConstructors = make(map[string]Constructor)
schedulerOptions = make(map[string]schedulerRegisterOptions)
subtaskExecutorConstructors = make(map[string]SubtaskExecutorConstructor)
subtaskExecutorOptions = make(map[string]subtaskExecutorRegisterOptions)
}
4 changes: 4 additions & 0 deletions disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func TestGlobalTaskTable(t *testing.T) {
require.Len(t, task5, 1)
require.Equal(t, task, task5[0])

task6, err := gm.GetTaskByKey("key1")
require.NoError(t, err)
require.Equal(t, task, task6)

// test cannot insert task with dup key
_, err = gm.AddNewTask("key1", "test2", 4, []byte("test2"))
require.EqualError(t, err, "[kv:1062]Duplicate entry 'key1' for key 'tidb_global_task.task_key'")
Expand Down
16 changes: 16 additions & 0 deletions disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,22 @@ func (stm *GlobalTaskManager) GetTaskByID(taskID int64) (task *proto.Task, err e
return row2GlobeTask(rs[0]), nil
}

// GetTaskByKey gets the task by the task key
func (stm *GlobalTaskManager) GetTaskByKey(key string) (task *proto.Task, err error) {
stm.mu.Lock()
defer stm.mu.Unlock()

rs, err := execSQL(stm.ctx, stm.se, "select id, task_key, type, dispatcher_id, state, start_time, state_update_time, meta, concurrency, step from mysql.tidb_global_task where task_key = %?", key)
if err != nil {
return task, err
}
if len(rs) == 0 {
return nil, nil
}

return row2GlobeTask(rs[0]), nil
}

// SubTaskManager is the manager of subtask.
type SubTaskManager struct {
ctx context.Context
Expand Down
3 changes: 3 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ go_library(
"//ddl/placement",
"//ddl/schematracker",
"//ddl/util",
"//disttask/framework/dispatcher",
"//disttask/framework/scheduler",
"//disttask/framework/storage",
"//domain/globalconfigsync",
"//domain/infosync",
"//domain/metrics",
Expand Down
92 changes: 92 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ import (
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/ddl/schematracker"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/disttask/framework/dispatcher"
"github.com/pingcap/tidb/disttask/framework/scheduler"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/domain/globalconfigsync"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/errno"
Expand Down Expand Up @@ -1127,6 +1130,10 @@ func (do *Domain) Init(
if err != nil {
return err
}

if err = do.initDistTaskLoop(ctx); err != nil {
return err
}
// step 3: start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction.
err = do.ddl.Start(sysCtxPool)
if err != nil {
Expand All @@ -1153,6 +1160,7 @@ func (do *Domain) Init(
do.closestReplicaReadCheckLoop(ctx, pdCli)
}, "closestReplicaReadCheckLoop")
}

err = do.initLogBackup(ctx, pdCli)
if err != nil {
return err
Expand Down Expand Up @@ -1313,6 +1321,90 @@ func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) erro
return nil
}

func (do *Domain) initDistTaskLoop(ctx context.Context) error {
se1, err := do.sysExecutorFactory(do)
if err != nil {
return err
}

se2, err := do.sysExecutorFactory(do)
if err != nil {
se1.Close()
return err
}

gm := storage.NewGlobalTaskManager(kv.WithInternalSourceType(ctx, kv.InternalDistTask), se1.(sessionctx.Context))
sm := storage.NewSubTaskManager(kv.WithInternalSourceType(ctx, kv.InternalDistTask), se2.(sessionctx.Context))
schedulerManager, err := scheduler.NewManagerBuilder().BuildManager(ctx, do.ddl.GetID(), gm, sm)
if err != nil {
se1.Close()
se2.Close()
return err
}

storage.SetGlobalTaskManager(gm)
storage.SetSubTaskManager(sm)
do.wg.Run(func() {
defer func() {
storage.SetGlobalTaskManager(nil)
storage.SetSubTaskManager(nil)
se1.Close()
se2.Close()
}()
do.distTaskFrameworkLoop(ctx, gm, sm, schedulerManager)
}, "distTaskFrameworkLoop")
return nil
}

func (do *Domain) distTaskFrameworkLoop(ctx context.Context, globalTaskManager *storage.GlobalTaskManager, subtaskManager *storage.SubTaskManager, schedulerManager *scheduler.Manager) {
schedulerManager.Start()
logutil.BgLogger().Info("dist task scheduler started")
defer func() {
logutil.BgLogger().Info("stopping dist task scheduler")
schedulerManager.Stop()
logutil.BgLogger().Info("dist task scheduler stopped")
}()

var dispatch dispatcher.Dispatch
startDispatchIfNeeded := func() {
if dispatch != nil {
return
}

newDispatch, err := dispatcher.NewDispatcher(ctx, globalTaskManager, subtaskManager)
if err != nil {
logutil.BgLogger().Error("failed to create a disttask dispatcher", zap.Error(err))
return
}
dispatch = newDispatch
dispatch.Start()
logutil.BgLogger().Info("a new dist task dispatcher started for current node becomes the DDL owner")
}
stopDispatchIfNeeded := func() {
if dispatch != nil {
logutil.BgLogger().Info("stopping dist task dispatcher because the current node is not DDL owner anymore")
dispatch.Stop()
dispatch = nil
logutil.BgLogger().Info("dist task dispatcher stopped")
}
}

ticker := time.Tick(time.Second)
for {
select {
case <-do.exit:
stopDispatchIfNeeded()
return
case <-ticker:
if do.ddl.OwnerManager().IsOwner() {
startDispatchIfNeeded()
} else {
stopDispatchIfNeeded()
}
}
}
}

type sessionPool struct {
resources chan pools.Resource
factory pools.Factory
Expand Down

0 comments on commit fa08c36

Please sign in to comment.