Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checker(dm): make start-task/resume-task/check-task return pre-check result only warning #4118

Merged
merged 15 commits into from
Jan 12, 2022
3 changes: 3 additions & 0 deletions dm/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ linters:
# - maligned

linters-settings:
dupl:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# tokens count to trigger issue, 150 by default
threshold: 200
govet:
# report about shadowed variables
check-shadowing: true
Expand Down
98 changes: 73 additions & 25 deletions dm/checker/check_test.go

Large diffs are not rendered by default.

97 changes: 50 additions & 47 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,59 +283,59 @@ func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
errs = append(errs, unit.NewProcessError(err))
} else if !result.Summary.Passed {
errs = append(errs, unit.NewProcessError(errors.New("check was failed, please see detail")))
warnLeft, errLeft := c.warnCnt, c.errCnt
}
warnLeft, errLeft := c.warnCnt, c.errCnt

// remove success result if not pass
results := result.Results[:0]
for _, r := range result.Results {
if r.State == checker.StateSuccess {
continue
}
// remove success result if not pass
results := result.Results[:0]
for _, r := range result.Results {
if r.State == checker.StateSuccess {
continue
}

// handle results without r.Errors
if len(r.Errors) == 0 {
switch r.State {
case checker.StateWarning:
if warnLeft == 0 {
continue
}
warnLeft--
results = append(results, r)
case checker.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
results = append(results, r)
// handle results without r.Errors
if len(r.Errors) == 0 {
switch r.State {
case checker.StateWarning:
if warnLeft == 0 {
continue
}
continue
warnLeft--
results = append(results, r)
case checker.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
results = append(results, r)
}
continue
}

subErrors := make([]*checker.Error, 0, len(r.Errors))
for _, e := range r.Errors {
switch e.Severity {
case checker.StateWarning:
if warnLeft == 0 {
continue
}
warnLeft--
subErrors = append(subErrors, e)
case checker.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
subErrors = append(subErrors, e)
subErrors := make([]*checker.Error, 0, len(r.Errors))
for _, e := range r.Errors {
switch e.Severity {
case checker.StateWarning:
if warnLeft == 0 {
continue
}
warnLeft--
subErrors = append(subErrors, e)
case checker.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
subErrors = append(subErrors, e)
}
// skip display an empty Result
if len(subErrors) > 0 {
r.Errors = subErrors
results = append(results, r)
}
}
result.Results = results
// skip display an empty Result
if len(subErrors) > 0 {
r.Errors = subErrors
results = append(results, r)
}
}
result.Results = results

c.updateInstruction(result)

Expand All @@ -345,9 +345,12 @@ func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
default:
}

rawResult, err := json.MarshalIndent(result, "\t", "\t")
if err != nil {
rawResult = []byte(fmt.Sprintf("marshal error %v", err))
var rawResult []byte
if result.Summary.Successful != result.Summary.Total {
rawResult, err = json.MarshalIndent(result, "\t", "\t")
if err != nil {
rawResult = []byte(fmt.Sprintf("marshal error %v", err))
}
}

c.result.Lock()
Expand Down
27 changes: 17 additions & 10 deletions dm/checker/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,31 @@ package checker

import (
"context"
"fmt"

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

var (
// ErrorMsgHeader used as the header of the error message when checking config failed.
ErrorMsgHeader = "fail to check synchronization configuration with type"
// CheckTaskMsgHeader used as the header of the error/warning message when checking config failed.
CheckTaskMsgHeader = "fail to check synchronization configuration with type"

CheckTaskSuccess = "check pass!!!"

// CheckSyncConfigFunc holds the CheckSyncConfig function.
CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) error
CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error)
)

func init() {
CheckSyncConfigFunc = CheckSyncConfig
}

// CheckSyncConfig checks synchronization configuration.
func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) error {
func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error) {
if len(cfgs) == 0 {
return nil
return "", nil
}

// all `IgnoreCheckingItems` and `Mode` of sub-task are same, so we take first one
Expand All @@ -53,25 +56,29 @@ func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt,
}
checkingItems := config.FilterCheckingItems(ignoreCheckingItems)
if len(checkingItems) == 0 {
return nil
return "", nil
}

c := NewChecker(cfgs, checkingItems, errCnt, warnCnt)

if err := c.Init(ctx); err != nil {
return terror.Annotate(err, "fail to initial checker")
return "", terror.Annotate(err, "fail to initial checker")
}
defer c.Close()

pr := make(chan pb.ProcessResult, 1)
c.Process(ctx, pr)
for len(pr) > 0 {
if len(pr) > 0 {
r := <-pr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we always assume there is one result in pr, as stated in

// Process does the main logic and its returning must send a result to pr channel.
// When ctx.Done, stops the process and returns, otherwise the DM-worker will be blocked forever
// When not in processing, call Process to continue or resume the process
Process(ctx context.Context, pr chan pb.ProcessResult)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be seen that checker is designed according to the unit at the beginning. But it is not in fact(not complete unit interface). And I think we don't plan to treat him as a unit in the future yet. So it is a semi-finished product. This is the reason that it looks a lot unreasonable.

I think we can modify these unreasonable places after the end of this feature. Result is also. @Ehco1996

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actully it depends on requirements, i am not sure if we have a plan to merge full-time/sync-time sync_diff in to checker? if so i thinks it's better to treat checker as a unit

// we only want first error
if len(r.Errors) > 0 {
return terror.ErrTaskCheckSyncConfigError.Generate(ErrorMsgHeader, r.Errors[0].Message, string(r.Detail))
return "", terror.ErrTaskCheckSyncConfigError.Generate(CheckTaskMsgHeader, r.Errors[0].Message, string(r.Detail))
}
if len(r.Detail) == 0 {
return CheckTaskSuccess, nil
}
return fmt.Sprintf("%s: no errors but some warnings\n detail: %s", CheckTaskMsgHeader, string(r.Detail)), nil
}

return nil
return "", nil
}
2 changes: 1 addition & 1 deletion dm/dm/ctl/master/check_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func checkTaskFunc(cmd *cobra.Command, _ []string) error {
return err
}

if !common.PrettyPrintResponseWithCheckTask(resp, checker.ErrorMsgHeader) {
if !common.PrettyPrintResponseWithCheckTask(resp, checker.CheckTaskMsgHeader) {
common.PrettyPrintResponse(resp)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/ctl/master/start_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func startTaskFunc(cmd *cobra.Command, _ []string) error {
return err
}

if !common.PrettyPrintResponseWithCheckTask(resp, checker.ErrorMsgHeader) {
if !common.PrettyPrintResponseWithCheckTask(resp, checker.CheckTaskMsgHeader) {
common.PrettyPrintResponse(resp)
}
return nil
Expand Down
9 changes: 7 additions & 2 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,16 @@ func (s *Server) DMAPIStartTask(c *gin.Context) {
for i := range subTaskConfigList {
subTaskConfigPList[i] = &subTaskConfigList[i]
}
if err = checker.CheckSyncConfigFunc(newCtx, subTaskConfigPList,
common.DefaultErrorCnt, common.DefaultWarnCnt); err != nil {
msg, err := checker.CheckSyncConfigFunc(newCtx, subTaskConfigPList,
common.DefaultErrorCnt, common.DefaultWarnCnt)
if err != nil {
_ = c.Error(terror.WithClass(err, terror.ClassDMMaster))
return
}
if len(msg) != 0 {
// TODO: return warning msg with http.StatusCreated and task together
log.L().Warn("openapi pre-check warning before start task", zap.String("warning", msg))
}
// specify only start task on partial sources
var needStartSubTaskList []config.SubTaskConfig
if req.SourceNameList != nil {
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,6 @@ func mockTaskQueryStatus(
).Return(queryResp, nil).MaxTimes(maxRetryNum)
}

func mockCheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) error {
return nil
func mockCheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error) {
return "", nil
}
Loading