-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: add load data dispatcher #42592
Changes from 7 commits
f975ad5
5ad5939
d007732
34665c4
816ec8c
1deabfa
b7f372a
4adce49
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -50,6 +50,12 @@ type Dispatch interface { | |||||
Stop() | ||||||
} | ||||||
|
||||||
// Handle provides the interface for operations needed by task flow handles. | ||||||
type Handle interface { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
// GetTaskAllInstances gets handles the task's all available instances. | ||||||
GetTaskAllInstances(ctx context.Context, gTaskID int64) ([]string, error) | ||||||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||
} | ||||||
|
||||||
func (d *dispatcher) getRunningGlobalTasks() map[int64]*proto.Task { | ||||||
d.runningGlobalTasks.RLock() | ||||||
defer d.runningGlobalTasks.RUnlock() | ||||||
|
@@ -246,7 +252,7 @@ func (d *dispatcher) updateTaskRevertInfo(gTask *proto.Task) { | |||||
|
||||||
func (d *dispatcher) processErrFlow(gTask *proto.Task, receiveErr string) error { | ||||||
// TODO: Maybe it gets GetTaskFlowHandle fails when rolling upgrades. | ||||||
meta, err := GetTaskFlowHandle(gTask.Type).ProcessErrFlow(d, gTask, receiveErr) | ||||||
meta, err := GetTaskFlowHandle(gTask.Type).ProcessErrFlow(d.ctx, d, gTask, receiveErr) | ||||||
if err != nil { | ||||||
logutil.BgLogger().Warn("handle error failed", zap.Error(err)) | ||||||
return err | ||||||
|
@@ -292,7 +298,7 @@ func (d *dispatcher) processNormalFlow(gTask *proto.Task) (err error) { | |||||
d.updateTaskRevertInfo(gTask) | ||||||
return errors.Errorf("%s type handle doesn't register", gTask.Type) | ||||||
} | ||||||
metas, err := handle.ProcessNormalFlow(d, gTask) | ||||||
metas, err := handle.ProcessNormalFlow(d.ctx, d, gTask) | ||||||
if err != nil { | ||||||
logutil.BgLogger().Warn("gen dist-plan failed", zap.Error(err)) | ||||||
return err | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in https://github.com/stretchr/testify#mock-package
Can we add a makefile entry for it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can add in later pr with https://github.com/pingcap/tidb/blob/master/disttask/framework/scheduler/interface_mock.go There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. generate it with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, in next pr #42592 (comment) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
// Copyright 2023 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package dispatcher | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/stretchr/testify/mock" | ||
) | ||
|
||
// MockHandle is used to mock the Handle. | ||
type MockHandle struct { | ||
mock.Mock | ||
} | ||
|
||
// GetTaskAllInstances implements the Handle.GetTaskAllInstances interface. | ||
func (m *MockHandle) GetTaskAllInstances(ctx context.Context, gTaskID int64) ([]string, error) { | ||
args := m.Called(ctx, gTaskID) | ||
if args.Error(1) != nil { | ||
return nil, args.Error(1) | ||
} | ||
return args.Get(0).([]string), nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,14 +15,16 @@ | |
package dispatcher | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/pingcap/tidb/disttask/framework/proto" | ||
"github.com/pingcap/tidb/util/syncutil" | ||
) | ||
|
||
// TaskFlowHandle is used to control the process operations for each global task. | ||
type TaskFlowHandle interface { | ||
ProcessNormalFlow(d Dispatch, gTask *proto.Task) (metas [][]byte, err error) | ||
ProcessErrFlow(d Dispatch, gTask *proto.Task, receive string) (meta []byte, err error) | ||
ProcessNormalFlow(ctx context.Context, h Handle, gTask *proto.Task) (metas [][]byte, err error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the comment says this is for each task, so why we need to specify There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TaskFlowHandle gets the gTask from dispatcher, and then split the gTask to subtask There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems different types of task has different TaskFlowHandle implementation, can we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PTAL @zimulala , maybe let |
||
ProcessErrFlow(ctx context.Context, h Handle, gTask *proto.Task, receive string) (meta []byte, err error) | ||
} | ||
|
||
var taskFlowHandleMap struct { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,49 @@ | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library") | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||
|
||
go_library( | ||
name = "loaddata", | ||
srcs = ["proto.go"], | ||
srcs = [ | ||
"dispatcher.go", | ||
"proto.go", | ||
"wrapper.go", | ||
], | ||
importpath = "github.com/pingcap/tidb/disttask/loaddata", | ||
visibility = ["//visibility:public"], | ||
deps = [ | ||
"//br/pkg/lightning/backend", | ||
"//br/pkg/lightning/config", | ||
"//br/pkg/lightning/mydump", | ||
"//br/pkg/storage", | ||
"//disttask/framework/dispatcher", | ||
"//disttask/framework/proto", | ||
"//executor/importer", | ||
"//parser/model", | ||
"//parser/mysql", | ||
"//util/intest", | ||
"//util/logutil", | ||
"@com_github_pingcap_errors//:errors", | ||
"@org_uber_go_zap//:zap", | ||
], | ||
) | ||
|
||
go_test( | ||
name = "loaddata_test", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add
|
||
timeout = "short", | ||
srcs = [ | ||
"dispatcher_test.go", | ||
"wrapper_test.go", | ||
], | ||
data = glob(["testdata/**"]), | ||
embed = [":loaddata"], | ||
flaky = True, | ||
deps = [ | ||
"//br/pkg/lightning/config", | ||
"//br/pkg/lightning/mydump", | ||
"//disttask/framework/dispatcher", | ||
"//disttask/framework/proto", | ||
"//executor/importer", | ||
"//parser/model", | ||
"@com_github_stretchr_testify//mock", | ||
"@com_github_stretchr_testify//require", | ||
], | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
// Copyright 2023 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package loaddata | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
|
||
"github.com/pingcap/tidb/disttask/framework/dispatcher" | ||
"github.com/pingcap/tidb/disttask/framework/proto" | ||
"github.com/pingcap/tidb/util/logutil" | ||
"go.uber.org/zap" | ||
"golang.org/x/exp/maps" | ||
) | ||
|
||
// FlowHandle is the dispatcher for load data. | ||
type FlowHandle struct{} | ||
|
||
// ProcessNormalFlow implements dispatcher.TaskFlowHandle interface. | ||
func (*FlowHandle) ProcessNormalFlow(ctx context.Context, dispatch dispatcher.Handle, gTask *proto.Task) ([][]byte, error) { | ||
taskMeta := &TaskMeta{} | ||
err := json.Unmarshal(gTask.Meta, taskMeta) | ||
if err != nil { | ||
return nil, err | ||
} | ||
logutil.BgLogger().Info("process normal flow", zap.Any("task_meta", taskMeta), zap.Any("step", gTask.Step)) | ||
|
||
switch gTask.Step { | ||
case Import: | ||
gTask.State = proto.TaskStateSucceed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the difference of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will each step has the full lifetime of state transition? Or step is changed only in TaskStateRunning and only the final step will finish the state? Please add comment |
||
return nil, nil | ||
default: | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
instances, err := dispatch.GetTaskAllInstances(ctx, gTask.ID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
subtaskMetas, err := generateSubtaskMetas(ctx, taskMeta, len(instances)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
logutil.BgLogger().Info("generate subtasks", zap.Any("subtask_metas", subtaskMetas)) | ||
metaBytes := make([][]byte, 0, len(taskMeta.FileInfos)) | ||
for _, subtaskMeta := range subtaskMetas { | ||
bs, err := json.Marshal(subtaskMeta) | ||
if err != nil { | ||
return nil, err | ||
} | ||
metaBytes = append(metaBytes, bs) | ||
} | ||
gTask.Step = Import | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add comment to the interface to mention that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zimulala Not sure if you can fix it in your PR conveniently There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll comment on that |
||
return metaBytes, nil | ||
} | ||
|
||
// ProcessErrFlow implements dispatcher.ProcessErrFlow interface. | ||
func (*FlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.Handle, _ *proto.Task, errMsg string) ([]byte, error) { | ||
logutil.BgLogger().Info("process error flow", zap.String("error message", errMsg)) | ||
return nil, nil | ||
} | ||
|
||
func generateSubtaskMetas(ctx context.Context, task *TaskMeta, concurrency int) ([]*SubtaskMeta, error) { | ||
tableRegions, err := makeTableRegions(ctx, task, concurrency) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
subtaskMetaMap := make(map[int32]*SubtaskMeta) | ||
for _, region := range tableRegions { | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
subtaskMeta, ok := subtaskMetaMap[region.EngineID] | ||
if !ok { | ||
subtaskMeta = &SubtaskMeta{ | ||
Table: task.Table, | ||
Format: task.Format, | ||
Dir: task.Dir, | ||
} | ||
subtaskMetaMap[region.EngineID] = subtaskMeta | ||
} | ||
subtaskMeta.Chunks = append(subtaskMeta.Chunks, Chunk{ | ||
Path: region.FileMeta.Path, | ||
Offset: region.Chunk.Offset, | ||
EndOffset: region.Chunk.EndOffset, | ||
RealOffset: region.Chunk.RealOffset, | ||
PrevRowIDMax: region.Chunk.PrevRowIDMax, | ||
RowIDMax: region.Chunk.RowIDMax, | ||
}) | ||
} | ||
return maps.Values[map[int32]*SubtaskMeta](subtaskMetaMap), nil | ||
} | ||
|
||
func init() { | ||
hawkingrei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
dispatcher.RegisterTaskFlowHandle(proto.LoadData, &FlowHandle{}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need to use lock inside RegisterTaskFlowHandle?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But we're not sure that all the calls are inside the init function |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we can add
race = "on",
togo_test
for finding the data race. You can decide whether to open it according to the situation.