Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: add load data dispatcher #42592

Merged
merged 8 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ header:
- "tidb-binlog/driver/example"
- "tidb-binlog/proto/go-binlog/secondary_binlog.pb.go"
- "**/*.sql"
- "**/*.csv"
- "**/*.parquet"
- "**/*.zst"
- ".bazelversion"
- "build/image/.ci_bazel"
comment: on-failure
8 changes: 4 additions & 4 deletions br/pkg/lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ import (
const (
// mydumper
ReadBlockSize ByteSize = 64 * units.KiB
MaxRegionSize ByteSize = 256 * units.MiB
// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
// lower the max-key-count to avoid tikv trigger region auto split
SplitRegionSize ByteSize = 96 * units.MiB
SplitRegionKeys int = 1_280_000
MaxSplitRegionSizeRatio int = 10

defaultMaxAllowedPacket = 64 * units.MiB

DefaultBatchSize ByteSize = 100 * units.GiB
)

var (
Expand All @@ -44,5 +41,8 @@ var (
PermitWithoutStream: false,
})
// BufferSizeScale is the factor of block buffer size
BufferSizeScale = int64(5)
BufferSizeScale = int64(5)
DefaultBatchSize ByteSize = 100 * units.GiB
// mydumper
MaxRegionSize ByteSize = 256 * units.MiB
)
4 changes: 2 additions & 2 deletions ddl/disttask_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewLitBackfillFlowHandle(getDDL func() DDL) dispatcher.TaskFlowHandle {
}

// ProcessNormalFlow processes the normal flow.
func (h *litBackfillFlowHandle) ProcessNormalFlow(_ dispatcher.Dispatch, gTask *proto.Task) (metas [][]byte, err error) {
func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.Handle, gTask *proto.Task) (metas [][]byte, err error) {
if gTask.State != proto.TaskStatePending {
// This flow has only one step, finish task when it is not pending
return nil, nil
Expand Down Expand Up @@ -108,7 +108,7 @@ func (h *litBackfillFlowHandle) ProcessNormalFlow(_ dispatcher.Dispatch, gTask *
return subTaskMetas, nil
}

func (*litBackfillFlowHandle) ProcessErrFlow(_ dispatcher.Dispatch, _ *proto.Task, _ string) (meta []byte, err error) {
func (*litBackfillFlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.Handle, _ *proto.Task, _ string) (meta []byte, err error) {
// We do not need extra meta info when rolling back
return nil, nil
}
13 changes: 7 additions & 6 deletions ddl/disttask_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ddl_test

import (
"context"
"encoding/json"
"testing"
"time"
Expand Down Expand Up @@ -48,7 +49,7 @@ func TestBackfillFlowHandle(t *testing.T) {
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp1"))
require.NoError(t, err)
tblInfo := tbl.Meta()
metas, err := handler.ProcessNormalFlow(nil, gTask)
metas, err := handler.ProcessNormalFlow(context.Background(), nil, gTask)
require.NoError(t, err)
require.Equal(t, proto.StepOne, gTask.Step)
require.Equal(t, len(tblInfo.Partition.Definitions), len(metas))
Expand All @@ -60,18 +61,18 @@ func TestBackfillFlowHandle(t *testing.T) {

// test partition table ProcessNormalFlow after step1 finished
gTask.State = proto.TaskStateRunning
metas, err = handler.ProcessNormalFlow(nil, gTask)
metas, err = handler.ProcessNormalFlow(context.Background(), nil, gTask)
require.NoError(t, err)
require.Equal(t, 0, len(metas))

// test partition table ProcessErrFlow
errMeta, err := handler.ProcessErrFlow(nil, gTask, "mockErr")
errMeta, err := handler.ProcessErrFlow(context.Background(), nil, gTask, "mockErr")
require.NoError(t, err)
require.Nil(t, errMeta)

// test merging index
gTask = createAddIndexGlobalTask(t, dom, "test", "tp1", ddl.FlowHandleLitMergeType)
metas, err = handler.ProcessNormalFlow(nil, gTask)
metas, err = handler.ProcessNormalFlow(context.Background(), nil, gTask)
require.NoError(t, err)
require.Equal(t, proto.StepOne, gTask.Step)
require.Equal(t, len(tblInfo.Partition.Definitions), len(metas))
Expand All @@ -81,14 +82,14 @@ func TestBackfillFlowHandle(t *testing.T) {
require.Equal(t, par.ID, subTask.PhysicalTableID)
}

errMeta, err = handler.ProcessErrFlow(nil, gTask, "mockErr")
errMeta, err = handler.ProcessErrFlow(context.Background(), nil, gTask, "mockErr")
require.NoError(t, err)
require.Nil(t, errMeta)

// test normal table not supported yet
tk.MustExec("create table t1(id int primary key, v int)")
gTask = createAddIndexGlobalTask(t, dom, "test", "t1", ddl.FlowHandleLitBackfillType)
_, err = handler.ProcessNormalFlow(nil, gTask)
_, err = handler.ProcessNormalFlow(context.Background(), nil, gTask)
require.EqualError(t, err, "Non-partition table not supported yet")
}

Expand Down
2 changes: 2 additions & 0 deletions disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "dispatcher",
srcs = [
"dispatcher.go",
"dispatcher_mock.go",
"register.go",
],
importpath = "github.com/pingcap/tidb/disttask/framework/dispatcher",
Expand All @@ -16,6 +17,7 @@ go_library(
"//util/logutil",
"//util/syncutil",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//mock",
Copy link
Member

@hawkingrei hawkingrei Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we can add race = "on", to go_test for finding the data race. You can decide whether to open it according to the situation.

"@org_uber_go_zap//:zap",
],
)
Expand Down
10 changes: 8 additions & 2 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type Dispatch interface {
Stop()
}

// Handle provides the interface for operations needed by task flow handles.
type Handle interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type Handle interface {
type TaskHandle interface {

// GetTaskAllInstances gets handles the task's all available instances.
GetTaskAllInstances(ctx context.Context, gTaskID int64) ([]string, error)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetAllSchedulerIDs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

func (d *dispatcher) getRunningGlobalTasks() map[int64]*proto.Task {
d.runningGlobalTasks.RLock()
defer d.runningGlobalTasks.RUnlock()
Expand Down Expand Up @@ -246,7 +252,7 @@ func (d *dispatcher) updateTaskRevertInfo(gTask *proto.Task) {

func (d *dispatcher) processErrFlow(gTask *proto.Task, receiveErr string) error {
// TODO: Maybe it gets GetTaskFlowHandle fails when rolling upgrades.
meta, err := GetTaskFlowHandle(gTask.Type).ProcessErrFlow(d, gTask, receiveErr)
meta, err := GetTaskFlowHandle(gTask.Type).ProcessErrFlow(d.ctx, d, gTask, receiveErr)
if err != nil {
logutil.BgLogger().Warn("handle error failed", zap.Error(err))
return err
Expand Down Expand Up @@ -292,7 +298,7 @@ func (d *dispatcher) processNormalFlow(gTask *proto.Task) (err error) {
d.updateTaskRevertInfo(gTask)
return errors.Errorf("%s type handle doesn't register", gTask.Type)
}
metas, err := handle.ProcessNormalFlow(d, gTask)
metas, err := handle.ProcessNormalFlow(d.ctx, d, gTask)
if err != nil {
logutil.BgLogger().Warn("gen dist-plan failed", zap.Error(err))
return err
Expand Down
35 changes: 35 additions & 0 deletions disttask/framework/dispatcher/dispatcher_mock.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in https://github.com/stretchr/testify#mock-package

You can use the mockery tool to autogenerate the mock code against an interface as well, making using mocks much quicker.

Can we add a makefile entry for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate it with mockgen? and put it into Makefile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, in next pr #42592 (comment)

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dispatcher

import (
"context"

"github.com/stretchr/testify/mock"
)

// MockHandle is used to mock the Handle.
type MockHandle struct {
mock.Mock
}

// GetTaskAllInstances implements the Handle.GetTaskAllInstances interface.
func (m *MockHandle) GetTaskAllInstances(ctx context.Context, gTaskID int64) ([]string, error) {
args := m.Called(ctx, gTaskID)
if args.Error(1) != nil {
return nil, args.Error(1)
}
return args.Get(0).([]string), nil
}
4 changes: 2 additions & 2 deletions disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ const taskTypeExample = "task_example"
type NumberExampleHandle struct {
}

func (n NumberExampleHandle) ProcessNormalFlow(_ dispatcher.Dispatch, gTask *proto.Task) (metas [][]byte, err error) {
func (n NumberExampleHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.Handle, gTask *proto.Task) (metas [][]byte, err error) {
if gTask.State == proto.TaskStatePending {
gTask.Step = proto.StepInit
}
Expand All @@ -211,7 +211,7 @@ func (n NumberExampleHandle) ProcessNormalFlow(_ dispatcher.Dispatch, gTask *pro
return metas, nil
}

func (n NumberExampleHandle) ProcessErrFlow(_ dispatcher.Dispatch, _ *proto.Task, _ string) (meta []byte, err error) {
func (n NumberExampleHandle) ProcessErrFlow(_ context.Context, _ dispatcher.Handle, _ *proto.Task, _ string) (meta []byte, err error) {
// Don't handle not.
return nil, nil
}
6 changes: 4 additions & 2 deletions disttask/framework/dispatcher/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
package dispatcher

import (
"context"

"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/util/syncutil"
)

// TaskFlowHandle is used to control the process operations for each global task.
type TaskFlowHandle interface {
ProcessNormalFlow(d Dispatch, gTask *proto.Task) (metas [][]byte, err error)
ProcessErrFlow(d Dispatch, gTask *proto.Task, receive string) (meta []byte, err error)
ProcessNormalFlow(ctx context.Context, h Handle, gTask *proto.Task) (metas [][]byte, err error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment says this is for each task, so why we need to specify gTask as argument?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskFlowHandle gets the gTask from dispatcher, and then split the gTask to subtask

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems different types of task has different TaskFlowHandle implementation, can we use gTask to create a new struct as the receiver of these methods? In other words, ProcessNormalFlow is a method of task object, to avoid use gTask as parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL @zimulala , maybe let RegisterTaskFlowHandle register a factory function

ProcessErrFlow(ctx context.Context, h Handle, gTask *proto.Task, receive string) (meta []byte, err error)
}

var taskFlowHandleMap struct {
Expand Down
38 changes: 36 additions & 2 deletions disttask/loaddata/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,15 +1,49 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "loaddata",
srcs = ["proto.go"],
srcs = [
"dispatcher.go",
"proto.go",
"wrapper.go",
],
importpath = "github.com/pingcap/tidb/disttask/loaddata",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/config",
"//br/pkg/lightning/mydump",
"//br/pkg/storage",
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
"//executor/importer",
"//parser/model",
"//parser/mysql",
"//util/intest",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "loaddata_test",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add

timeout = "short",
flaky = True,

timeout = "short",
srcs = [
"dispatcher_test.go",
"wrapper_test.go",
],
data = glob(["testdata/**"]),
embed = [":loaddata"],
flaky = True,
deps = [
"//br/pkg/lightning/config",
"//br/pkg/lightning/mydump",
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
"//executor/importer",
"//parser/model",
"@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
],
)
105 changes: 105 additions & 0 deletions disttask/loaddata/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package loaddata

import (
"context"
"encoding/json"

"github.com/pingcap/tidb/disttask/framework/dispatcher"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

// FlowHandle is the dispatcher for load data.
type FlowHandle struct{}

// ProcessNormalFlow implements dispatcher.TaskFlowHandle interface.
func (*FlowHandle) ProcessNormalFlow(ctx context.Context, dispatch dispatcher.Handle, gTask *proto.Task) ([][]byte, error) {
taskMeta := &TaskMeta{}
err := json.Unmarshal(gTask.Meta, taskMeta)
if err != nil {
return nil, err
}
logutil.BgLogger().Info("process normal flow", zap.Any("task_meta", taskMeta), zap.Any("step", gTask.Step))

switch gTask.Step {
case Import:
gTask.State = proto.TaskStateSucceed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference of Step and State? Please add comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will each step has the full lifetime of state transition? Or step is changed only in TaskStateRunning and only the final step will finish the state?

Please add comment

return nil, nil
default:
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}

instances, err := dispatch.GetTaskAllInstances(ctx, gTask.ID)
if err != nil {
return nil, err
}
subtaskMetas, err := generateSubtaskMetas(ctx, taskMeta, len(instances))
if err != nil {
return nil, err
}
logutil.BgLogger().Info("generate subtasks", zap.Any("subtask_metas", subtaskMetas))
metaBytes := make([][]byte, 0, len(taskMeta.FileInfos))
for _, subtaskMeta := range subtaskMetas {
bs, err := json.Marshal(subtaskMeta)
if err != nil {
return nil, err
}
metaBytes = append(metaBytes, bs)
}
gTask.Step = Import
Copy link
Contributor

@lance6716 lance6716 Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comment to the interface to mention that gTask will be modified inside this method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zimulala Not sure if you can fix it in your PR conveniently

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll comment on that

return metaBytes, nil
}

// ProcessErrFlow implements dispatcher.ProcessErrFlow interface.
func (*FlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.Handle, _ *proto.Task, errMsg string) ([]byte, error) {
logutil.BgLogger().Info("process error flow", zap.String("error message", errMsg))
return nil, nil
}

func generateSubtaskMetas(ctx context.Context, task *TaskMeta, concurrency int) ([]*SubtaskMeta, error) {
tableRegions, err := makeTableRegions(ctx, task, concurrency)
if err != nil {
return nil, err
}

subtaskMetaMap := make(map[int32]*SubtaskMeta)
for _, region := range tableRegions {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
subtaskMeta, ok := subtaskMetaMap[region.EngineID]
if !ok {
subtaskMeta = &SubtaskMeta{
Table: task.Table,
Format: task.Format,
Dir: task.Dir,
}
subtaskMetaMap[region.EngineID] = subtaskMeta
}
subtaskMeta.Chunks = append(subtaskMeta.Chunks, Chunk{
Path: region.FileMeta.Path,
Offset: region.Chunk.Offset,
EndOffset: region.Chunk.EndOffset,
RealOffset: region.Chunk.RealOffset,
PrevRowIDMax: region.Chunk.PrevRowIDMax,
RowIDMax: region.Chunk.RowIDMax,
})
}
return maps.Values[map[int32]*SubtaskMeta](subtaskMetaMap), nil
}

func init() {
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
dispatcher.RegisterTaskFlowHandle(proto.LoadData, &FlowHandle{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to use lock inside RegisterTaskFlowHandle?

https://go.dev/ref/spec

Package initialization—variable initialization and the invocation of init functions—happens in a single goroutine, sequentially, one package at a time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we're not sure that all the calls are inside the init function

}
Loading