-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: add load data dispatcher #42592
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
) | ||
|
||
go_test( | ||
name = "loaddata_test", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add
timeout = "short",
flaky = True,
/retest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in https://github.com/stretchr/testify#mock-package
You can use the mockery tool to autogenerate the mock code against an interface as well, making using mocks much quicker.
Can we add a makefile entry for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add in later pr with https://github.com/pingcap/tidb/blob/master/disttask/framework/scheduler/interface_mock.go
"github.com/pingcap/tidb/disttask/framework/proto" | ||
"github.com/pingcap/tidb/util/syncutil" | ||
) | ||
|
||
// TaskFlowHandle is used to control the process operations for each global task. | ||
type TaskFlowHandle interface { | ||
ProcessNormalFlow(d Dispatch, gTask *proto.Task) (metas [][]byte, err error) | ||
ProcessErrFlow(d Dispatch, gTask *proto.Task, receive string) (meta []byte, err error) | ||
ProcessNormalFlow(ctx context.Context, h Handle, gTask *proto.Task) (metas [][]byte, err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment says this is for each task, so why we need to specify gTask
as argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskFlowHandle gets the gTask from dispatcher, and then split the gTask to subtask
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems different types of task has different TaskFlowHandle implementation, can we use gTask
to create a new struct as the receiver of these methods? In other words, ProcessNormalFlow is a method of task object, to avoid use gTask
as parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL @zimulala , maybe let RegisterTaskFlowHandle
register a factory function
"github.com/pingcap/tidb/disttask/framework/proto" | ||
"github.com/pingcap/tidb/util/syncutil" | ||
) | ||
|
||
// TaskFlowHandle is used to control the process operations for each global task. | ||
type TaskFlowHandle interface { | ||
ProcessNormalFlow(d Dispatch, gTask *proto.Task) (metas [][]byte, err error) | ||
ProcessErrFlow(d Dispatch, gTask *proto.Task, receive string) (meta []byte, err error) | ||
ProcessNormalFlow(ctx context.Context, h Handle, gTask *proto.Task) (metas [][]byte, err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems different types of task has different TaskFlowHandle implementation, can we use gTask
to create a new struct as the receiver of these methods? In other words, ProcessNormalFlow is a method of task object, to avoid use gTask
as parameter.
|
||
switch gTask.Step { | ||
case Import: | ||
gTask.State = proto.TaskStateSucceed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference of Step
and State
? Please add comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will each step has the full lifetime of state transition? Or step is changed only in TaskStateRunning and only the final step will finish the state?
Please add comment
} | ||
metaBytes = append(metaBytes, bs) | ||
} | ||
gTask.Step = Import |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add comment to the interface to mention that gTask
will be modified inside this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zimulala Not sure if you can fix it in your PR conveniently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll comment on that
} | ||
|
||
func init() { | ||
dispatcher.RegisterTaskFlowHandle(proto.LoadData, &FlowHandle{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to use lock inside RegisterTaskFlowHandle?
Package initialization—variable initialization and the invocation of init functions—happens in a single goroutine, sequentially, one package at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we're not sure that all the calls are inside the init function
|
||
switch gTask.Step { | ||
case Import: | ||
gTask.State = proto.TaskStateSucceed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will each step has the full lifetime of state transition? Or step is changed only in TaskStateRunning and only the final step will finish the state?
Please add comment
"github.com/pingcap/tidb/disttask/framework/proto" | ||
"github.com/pingcap/tidb/util/syncutil" | ||
) | ||
|
||
// TaskFlowHandle is used to control the process operations for each global task. | ||
type TaskFlowHandle interface { | ||
ProcessNormalFlow(d Dispatch, gTask *proto.Task) (metas [][]byte, err error) | ||
ProcessErrFlow(d Dispatch, gTask *proto.Task, receive string) (meta []byte, err error) | ||
ProcessNormalFlow(ctx context.Context, h Handle, gTask *proto.Task) (metas [][]byte, err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL @zimulala , maybe let RegisterTaskFlowHandle
register a factory function
} | ||
metaBytes = append(metaBytes, bs) | ||
} | ||
gTask.Step = Import |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zimulala Not sure if you can fix it in your PR conveniently
} | ||
|
||
func init() { | ||
dispatcher.RegisterTaskFlowHandle(proto.LoadData, &FlowHandle{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -50,6 +50,12 @@ type Dispatch interface { | |||
Stop() | |||
} | |||
|
|||
// Handle provides the interface for operations needed by task flow handles. | |||
type Handle interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type Handle interface { | |
type TaskHandle interface { |
// Handle provides the interface for operations needed by task flow handles. | ||
type Handle interface { | ||
// GetTaskAllInstances gets handles the task's all available instances. | ||
GetTaskAllInstances(ctx context.Context, gTaskID int64) ([]string, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetAllSchedulerIDs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generate it with mockgen
? and put it into Makefile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, in next pr #42592 (comment)
@@ -16,6 +17,7 @@ go_library( | |||
"//util/logutil", | |||
"//util/syncutil", | |||
"@com_github_pingcap_errors//:errors", | |||
"@com_github_stretchr_testify//mock", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we can add race = "on",
to go_test
for finding the data race. You can decide whether to open it according to the situation.
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 4adce49
|
What problem does this PR solve?
Issue Number: close #42591
Problem Summary:
What is changed and how it works?
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.