Skip to content

Commit

Permalink
disttask: add load data dispatcher (#42592)
Browse files Browse the repository at this point in the history
close #42591
  • Loading branch information
GMHDBJD authored Mar 29, 2023
1 parent f3faf95 commit af62a5c
Show file tree
Hide file tree
Showing 18 changed files with 633 additions and 41 deletions.
3 changes: 3 additions & 0 deletions .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ header:
- "tidb-binlog/driver/example"
- "tidb-binlog/proto/go-binlog/secondary_binlog.pb.go"
- "**/*.sql"
- "**/*.csv"
- "**/*.parquet"
- "**/*.zst"
- ".bazelversion"
- "build/image/.ci_bazel"
comment: on-failure
8 changes: 4 additions & 4 deletions br/pkg/lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ import (
const (
// mydumper
ReadBlockSize ByteSize = 64 * units.KiB
MaxRegionSize ByteSize = 256 * units.MiB
// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
// lower the max-key-count to avoid tikv trigger region auto split
SplitRegionSize ByteSize = 96 * units.MiB
SplitRegionKeys int = 1_280_000
MaxSplitRegionSizeRatio int = 10

defaultMaxAllowedPacket = 64 * units.MiB

DefaultBatchSize ByteSize = 100 * units.GiB
)

var (
Expand All @@ -44,5 +41,8 @@ var (
PermitWithoutStream: false,
})
// BufferSizeScale is the factor of block buffer size
BufferSizeScale = int64(5)
BufferSizeScale = int64(5)
DefaultBatchSize ByteSize = 100 * units.GiB
// mydumper
MaxRegionSize ByteSize = 256 * units.MiB
)
4 changes: 2 additions & 2 deletions ddl/disttask_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewLitBackfillFlowHandle(getDDL func() DDL) dispatcher.TaskFlowHandle {
}

// ProcessNormalFlow processes the normal flow.
func (h *litBackfillFlowHandle) ProcessNormalFlow(_ dispatcher.Dispatch, gTask *proto.Task) (metas [][]byte, err error) {
func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
if gTask.State != proto.TaskStatePending {
// This flow has only one step, finish task when it is not pending
return nil, nil
Expand Down Expand Up @@ -108,7 +108,7 @@ func (h *litBackfillFlowHandle) ProcessNormalFlow(_ dispatcher.Dispatch, gTask *
return subTaskMetas, nil
}

func (*litBackfillFlowHandle) ProcessErrFlow(_ dispatcher.Dispatch, _ *proto.Task, _ string) (meta []byte, err error) {
func (*litBackfillFlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ string) (meta []byte, err error) {
// We do not need extra meta info when rolling back
return nil, nil
}
13 changes: 7 additions & 6 deletions ddl/disttask_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ddl_test

import (
"context"
"encoding/json"
"testing"
"time"
Expand Down Expand Up @@ -48,7 +49,7 @@ func TestBackfillFlowHandle(t *testing.T) {
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp1"))
require.NoError(t, err)
tblInfo := tbl.Meta()
metas, err := handler.ProcessNormalFlow(nil, gTask)
metas, err := handler.ProcessNormalFlow(context.Background(), nil, gTask)
require.NoError(t, err)
require.Equal(t, proto.StepOne, gTask.Step)
require.Equal(t, len(tblInfo.Partition.Definitions), len(metas))
Expand All @@ -60,18 +61,18 @@ func TestBackfillFlowHandle(t *testing.T) {

// test partition table ProcessNormalFlow after step1 finished
gTask.State = proto.TaskStateRunning
metas, err = handler.ProcessNormalFlow(nil, gTask)
metas, err = handler.ProcessNormalFlow(context.Background(), nil, gTask)
require.NoError(t, err)
require.Equal(t, 0, len(metas))

// test partition table ProcessErrFlow
errMeta, err := handler.ProcessErrFlow(nil, gTask, "mockErr")
errMeta, err := handler.ProcessErrFlow(context.Background(), nil, gTask, "mockErr")
require.NoError(t, err)
require.Nil(t, errMeta)

// test merging index
gTask = createAddIndexGlobalTask(t, dom, "test", "tp1", ddl.FlowHandleLitMergeType)
metas, err = handler.ProcessNormalFlow(nil, gTask)
metas, err = handler.ProcessNormalFlow(context.Background(), nil, gTask)
require.NoError(t, err)
require.Equal(t, proto.StepOne, gTask.Step)
require.Equal(t, len(tblInfo.Partition.Definitions), len(metas))
Expand All @@ -81,14 +82,14 @@ func TestBackfillFlowHandle(t *testing.T) {
require.Equal(t, par.ID, subTask.PhysicalTableID)
}

errMeta, err = handler.ProcessErrFlow(nil, gTask, "mockErr")
errMeta, err = handler.ProcessErrFlow(context.Background(), nil, gTask, "mockErr")
require.NoError(t, err)
require.Nil(t, errMeta)

// test normal table not supported yet
tk.MustExec("create table t1(id int primary key, v int)")
gTask = createAddIndexGlobalTask(t, dom, "test", "t1", ddl.FlowHandleLitBackfillType)
_, err = handler.ProcessNormalFlow(nil, gTask)
_, err = handler.ProcessNormalFlow(context.Background(), nil, gTask)
require.EqualError(t, err, "Non-partition table not supported yet")
}

Expand Down
2 changes: 2 additions & 0 deletions disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "dispatcher",
srcs = [
"dispatcher.go",
"dispatcher_mock.go",
"register.go",
],
importpath = "github.com/pingcap/tidb/disttask/framework/dispatcher",
Expand All @@ -16,6 +17,7 @@ go_library(
"//util/logutil",
"//util/syncutil",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//mock",
"@org_uber_go_zap//:zap",
],
)
Expand Down
19 changes: 13 additions & 6 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@ const (
type Dispatch interface {
// Start enables dispatching and monitoring mechanisms.
Start()
// GetTaskAllInstances gets handles the task's all available instances.
GetTaskAllInstances(ctx context.Context, gTaskID int64) ([]string, error)
// GetAllSchedulerIDs gets handles the task's all available instances.
GetAllSchedulerIDs(ctx context.Context, gTaskID int64) ([]string, error)
// Stop stops the dispatcher.
Stop()
}

// TaskHandle provides the interface for operations needed by task flow handles.
type TaskHandle interface {
// GetAllSchedulerIDs gets handles the task's all scheduler instances.
GetAllSchedulerIDs(ctx context.Context, gTaskID int64) ([]string, error)
}

func (d *dispatcher) getRunningGlobalTasks() map[int64]*proto.Task {
d.runningGlobalTasks.RLock()
defer d.runningGlobalTasks.RUnlock()
Expand Down Expand Up @@ -246,14 +252,14 @@ func (d *dispatcher) updateTaskRevertInfo(gTask *proto.Task) {

func (d *dispatcher) processErrFlow(gTask *proto.Task, receiveErr string) error {
// TODO: Maybe it gets GetTaskFlowHandle fails when rolling upgrades.
meta, err := GetTaskFlowHandle(gTask.Type).ProcessErrFlow(d, gTask, receiveErr)
meta, err := GetTaskFlowHandle(gTask.Type).ProcessErrFlow(d.ctx, d, gTask, receiveErr)
if err != nil {
logutil.BgLogger().Warn("handle error failed", zap.Error(err))
return err
}

// TODO: Consider using a new context.
instanceIDs, err := d.GetTaskAllInstances(d.ctx, gTask.ID)
instanceIDs, err := d.GetAllSchedulerIDs(d.ctx, gTask.ID)
if err != nil {
logutil.BgLogger().Warn("get global task's all instances failed", zap.Error(err))
return err
Expand Down Expand Up @@ -292,7 +298,7 @@ func (d *dispatcher) processNormalFlow(gTask *proto.Task) (err error) {
d.updateTaskRevertInfo(gTask)
return errors.Errorf("%s type handle doesn't register", gTask.Type)
}
metas, err := handle.ProcessNormalFlow(d, gTask)
metas, err := handle.ProcessNormalFlow(d.ctx, d, gTask)
if err != nil {
logutil.BgLogger().Warn("gen dist-plan failed", zap.Error(err))
return err
Expand Down Expand Up @@ -378,7 +384,8 @@ func GetEligibleInstance(ctx context.Context) (string, error) {
return "", errors.New("not found instance")
}

func (d *dispatcher) GetTaskAllInstances(ctx context.Context, gTaskID int64) ([]string, error) {
// GetAllSchedulerIDs gets all the scheduler IDs.
func (d *dispatcher) GetAllSchedulerIDs(ctx context.Context, gTaskID int64) ([]string, error) {
if len(MockTiDBIDs) != 0 {
return MockTiDBIDs, nil
}
Expand Down
35 changes: 35 additions & 0 deletions disttask/framework/dispatcher/dispatcher_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dispatcher

import (
"context"

"github.com/stretchr/testify/mock"
)

// MockHandle is used to mock the Handle.
type MockHandle struct {
mock.Mock
}

// GetAllSchedulerIDs implements the Handle.GetAllSchedulerIDs interface.
func (m *MockHandle) GetAllSchedulerIDs(ctx context.Context, gTaskID int64) ([]string, error) {
args := m.Called(ctx, gTaskID)
if args.Error(1) != nil {
return nil, args.Error(1)
}
return args.Get(0).([]string), nil
}
12 changes: 6 additions & 6 deletions disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestGetInstance(t *testing.T) {
instanceID, err := dispatcher.GetEligibleInstance(ctx)
require.Lenf(t, instanceID, 0, "instanceID:%d", instanceID)
require.EqualError(t, err, "not found instance")
instanceIDs, err := dsp.GetTaskAllInstances(ctx, 1)
instanceIDs, err := dsp.GetAllSchedulerIDs(ctx, 1)
require.Lenf(t, instanceIDs, 0, "instanceID:%d", instanceID)
require.NoError(t, err)

Expand All @@ -85,7 +85,7 @@ func TestGetInstance(t *testing.T) {
if instanceID != uuids[0] && instanceID != uuids[1] {
require.FailNowf(t, "expected uuids:%d,%d, actual uuid:%d", uuids[0], uuids[1], instanceID)
}
instanceIDs, err = dsp.GetTaskAllInstances(ctx, 1)
instanceIDs, err = dsp.GetAllSchedulerIDs(ctx, 1)
require.Lenf(t, instanceIDs, 0, "instanceID:%d", instanceID)
require.NoError(t, err)

Expand All @@ -99,7 +99,7 @@ func TestGetInstance(t *testing.T) {
}
err = subTaskMgr.AddNewTask(gTaskID, subtask.SchedulerID, nil, subtask.Type, true)
require.NoError(t, err)
instanceIDs, err = dsp.GetTaskAllInstances(ctx, gTaskID)
instanceIDs, err = dsp.GetAllSchedulerIDs(ctx, gTaskID)
require.NoError(t, err)
require.Equal(t, []string{uuids[1]}, instanceIDs)
// server ids: uuid0, uuid1
Expand All @@ -111,7 +111,7 @@ func TestGetInstance(t *testing.T) {
}
err = subTaskMgr.AddNewTask(gTaskID, subtask.SchedulerID, nil, subtask.Type, true)
require.NoError(t, err)
instanceIDs, err = dsp.GetTaskAllInstances(ctx, gTaskID)
instanceIDs, err = dsp.GetAllSchedulerIDs(ctx, gTaskID)
require.NoError(t, err)
require.Len(t, instanceIDs, len(uuids))
require.ElementsMatch(t, instanceIDs, uuids)
Expand Down Expand Up @@ -191,7 +191,7 @@ const taskTypeExample = "task_example"
type NumberExampleHandle struct {
}

func (n NumberExampleHandle) ProcessNormalFlow(_ dispatcher.Dispatch, gTask *proto.Task) (metas [][]byte, err error) {
func (n NumberExampleHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
if gTask.State == proto.TaskStatePending {
gTask.Step = proto.StepInit
}
Expand All @@ -211,7 +211,7 @@ func (n NumberExampleHandle) ProcessNormalFlow(_ dispatcher.Dispatch, gTask *pro
return metas, nil
}

func (n NumberExampleHandle) ProcessErrFlow(_ dispatcher.Dispatch, _ *proto.Task, _ string) (meta []byte, err error) {
func (n NumberExampleHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ string) (meta []byte, err error) {
// Don't handle not.
return nil, nil
}
6 changes: 4 additions & 2 deletions disttask/framework/dispatcher/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
package dispatcher

import (
"context"

"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/util/syncutil"
)

// TaskFlowHandle is used to control the process operations for each global task.
type TaskFlowHandle interface {
ProcessNormalFlow(d Dispatch, gTask *proto.Task) (metas [][]byte, err error)
ProcessErrFlow(d Dispatch, gTask *proto.Task, receive string) (meta []byte, err error)
ProcessNormalFlow(ctx context.Context, h TaskHandle, gTask *proto.Task) (metas [][]byte, err error)
ProcessErrFlow(ctx context.Context, h TaskHandle, gTask *proto.Task, receive string) (meta []byte, err error)
}

var taskFlowHandleMap struct {
Expand Down
39 changes: 37 additions & 2 deletions disttask/loaddata/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,15 +1,50 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "loaddata",
srcs = ["proto.go"],
srcs = [
"dispatcher.go",
"proto.go",
"wrapper.go",
],
importpath = "github.com/pingcap/tidb/disttask/loaddata",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/config",
"//br/pkg/lightning/mydump",
"//br/pkg/storage",
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
"//executor/importer",
"//parser/model",
"//parser/mysql",
"//util/intest",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_exp//maps",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "loaddata_test",
timeout = "short",
srcs = [
"dispatcher_test.go",
"wrapper_test.go",
],
data = glob(["testdata/**"]),
embed = [":loaddata"],
flaky = True,
deps = [
"//br/pkg/lightning/config",
"//br/pkg/lightning/mydump",
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
"//executor/importer",
"//parser/model",
"@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit af62a5c

Please sign in to comment.