From 750816fd986e42aad5c00d711850093e8c3a5a23 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Wed, 12 Jan 2022 12:35:42 +0800 Subject: [PATCH] checker(dm): make `start-task/resume-task/check-task` return pre-check result only warning (#4118) ref pingcap/tiflow#3608 --- dm/.golangci.yml | 3 + dm/checker/check_test.go | 98 ++++++++++++++----- dm/checker/checker.go | 97 +++++++++--------- dm/checker/cmd.go | 27 +++-- dm/dm/ctl/master/check_task.go | 2 +- dm/dm/ctl/master/start_task.go | 2 +- dm/dm/master/openapi.go | 9 +- dm/dm/master/openapi_test.go | 4 +- dm/dm/master/server.go | 71 ++++++++------ dm/dm/master/server_test.go | 4 +- dm/pkg/checker/table_structure.go | 2 +- dm/tests/dmctl_basic/check_list/check_task.sh | 16 ++- dm/tests/dmctl_basic/conf/only_warning.yaml | 44 +++++++++ dm/tests/dmctl_basic/data/db1.prepare.sql | 2 + dm/tests/dmctl_basic/run.sh | 9 ++ 15 files changed, 265 insertions(+), 125 deletions(-) create mode 100644 dm/tests/dmctl_basic/conf/only_warning.yaml diff --git a/dm/.golangci.yml b/dm/.golangci.yml index 7385bd63205..df9b4cd30c8 100644 --- a/dm/.golangci.yml +++ b/dm/.golangci.yml @@ -81,6 +81,9 @@ linters: # - maligned linters-settings: + dupl: + # tokens count to trigger issue, 150 by default + threshold: 200 govet: # report about shadowed variables check-shadowing: true diff --git a/dm/checker/check_test.go b/dm/checker/check_test.go index 395a6375a02..87452bf6f77 100644 --- a/dm/checker/check_test.go +++ b/dm/checker/check_test.go @@ -59,14 +59,18 @@ func ignoreExcept(itemMap map[string]struct{}) []string { } func (s *testCheckerSuite) TestIgnoreAllCheckingItems(c *tc.C) { - c.Assert(CheckSyncConfig(context.Background(), nil, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err := CheckSyncConfig(context.Background(), nil, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.IsNil) cfgs := []*config.SubTaskConfig{ { IgnoreCheckingItems: []string{config.AllChecking}, }, } - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.IsNil) } // nolint:dupl @@ -79,7 +83,8 @@ func (s *testCheckerSuite) TestDumpPrivilegeChecking(c *tc.C) { mock := conn.InitMockDB(c) mock.ExpectQuery("SHOW GRANTS").WillReturnRows(sqlmock.NewRows([]string{"Grants for User"}). AddRow("GRANT USAGE ON *.* TO 'haha'@'%'")) - err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) c.Assert(err, tc.ErrorMatches, "(.|\n)*lack.*RELOAD(.|\n)*") c.Assert(err, tc.ErrorMatches, "(.|\n)*lack.*Select(.|\n)*") @@ -87,7 +92,9 @@ func (s *testCheckerSuite) TestDumpPrivilegeChecking(c *tc.C) { mock.ExpectQuery("SHOW GRANTS").WillReturnRows(sqlmock.NewRows([]string{"Grants for User"}). AddRow("GRANT RELOAD,SELECT ON *.* TO 'haha'@'%'")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) } // nolint:dupl @@ -100,7 +107,8 @@ func (s *testCheckerSuite) TestReplicationPrivilegeChecking(c *tc.C) { mock := conn.InitMockDB(c) mock.ExpectQuery("SHOW GRANTS").WillReturnRows(sqlmock.NewRows([]string{"Grants for User"}). AddRow("GRANT USAGE ON *.* TO 'haha'@'%'")) - err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) c.Assert(err, tc.ErrorMatches, "(.|\n)*lack.*REPLICATION SLAVE(.|\n)*") c.Assert(err, tc.ErrorMatches, "(.|\n)*lack.*REPLICATION CLIENT(.|\n)*") @@ -108,7 +116,9 @@ func (s *testCheckerSuite) TestReplicationPrivilegeChecking(c *tc.C) { mock.ExpectQuery("SHOW GRANTS").WillReturnRows(sqlmock.NewRows([]string{"Grants for User"}). AddRow("GRANT REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'haha'@'%'")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) } func (s *testCheckerSuite) TestVersionChecking(c *tc.C) { @@ -121,25 +131,33 @@ func (s *testCheckerSuite) TestVersionChecking(c *tc.C) { mock := conn.InitMockDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "5.7.26-log")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) mock = conn.InitMockDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "10.1.29-MariaDB")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) mock = conn.InitMockDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "5.5.26-log")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.ErrorMatches, "(.|\n)*version required at least .* but got 5.5.26(.|\n)*") + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.ErrorMatches, "(.|\n)*version required at least .* but got 5.5.26(.|\n)*") mock = conn.InitMockDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "10.0.0-MariaDB")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.ErrorMatches, "(.|\n)*version required at least .* but got 10.0.0(.|\n)*") + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.ErrorMatches, "(.|\n)*version required at least .* but got 10.0.0(.|\n)*") } func (s *testCheckerSuite) TestServerIDChecking(c *tc.C) { @@ -152,13 +170,17 @@ func (s *testCheckerSuite) TestServerIDChecking(c *tc.C) { mock := conn.InitMockDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'server_id'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("server_id", "0")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.ErrorMatches, "(.|\n)*please set server_id greater than 0(.|\n)*") + msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.ErrorMatches, "(.|\n)*please set server_id greater than 0(.|\n)*") mock = conn.InitMockDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'server_id'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("server_id", "1")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) } func (s *testCheckerSuite) TestBinlogEnableChecking(c *tc.C) { @@ -171,13 +193,17 @@ func (s *testCheckerSuite) TestBinlogEnableChecking(c *tc.C) { mock := conn.InitMockDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'log_bin'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("log_bin", "OFF")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.ErrorMatches, "(.|\n)*log_bin is OFF, and should be ON(.|\n)*") + msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.ErrorMatches, "(.|\n)*log_bin is OFF, and should be ON(.|\n)*") mock = conn.InitMockDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'log_bin'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("log_bin", "ON")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) } func (s *testCheckerSuite) TestBinlogFormatChecking(c *tc.C) { @@ -190,13 +216,17 @@ func (s *testCheckerSuite) TestBinlogFormatChecking(c *tc.C) { mock := conn.InitMockDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'binlog_format'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("binlog_format", "STATEMENT")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.ErrorMatches, "(.|\n)*binlog_format is STATEMENT, and should be ROW(.|\n)*") + msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.ErrorMatches, "(.|\n)*binlog_format is STATEMENT, and should be ROW(.|\n)*") mock = conn.InitMockDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'binlog_format'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("binlog_format", "ROW")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) } func (s *testCheckerSuite) TestBinlogRowImageChecking(c *tc.C) { @@ -211,7 +241,9 @@ func (s *testCheckerSuite) TestBinlogRowImageChecking(c *tc.C) { AddRow("version", "5.7.26-log")) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("binlog_row_image", "MINIMAL")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.ErrorMatches, "(.|\n)*binlog_row_image is MINIMAL, and should be FULL(.|\n)*") + msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.ErrorMatches, "(.|\n)*binlog_row_image is MINIMAL, and should be FULL(.|\n)*") mock = conn.InitMockDB(c) @@ -219,7 +251,9 @@ func (s *testCheckerSuite) TestBinlogRowImageChecking(c *tc.C) { AddRow("version", "10.1.29-MariaDB")) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("binlog_row_image", "FULL")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) } func (s *testCheckerSuite) TestTableSchemaChecking(c *tc.C) { @@ -251,7 +285,9 @@ func (s *testCheckerSuite) TestTableSchemaChecking(c *tc.C) { mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb2))) mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.ErrorMatches, "(.|\n)*primary/unique key does not exist(.|\n)*") + msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.ErrorMatches, "(.|\n)*primary/unique key does not exist(.|\n)*") mock = conn.InitMockDB(c) @@ -261,7 +297,9 @@ func (s *testCheckerSuite) TestTableSchemaChecking(c *tc.C) { mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb2))) mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) } func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) { @@ -299,7 +337,9 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) { mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1))) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb2))) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.ErrorMatches, "(.|\n)*different column definition(.|\n)*") + msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.ErrorMatches, "(.|\n)*different column definition(.|\n)*") mock = conn.InitMockDB(c) @@ -308,7 +348,9 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) { mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1))) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb2))) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) } func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) { @@ -351,7 +393,9 @@ func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) { mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1))) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb2))) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.ErrorMatches, "(.|\n)*instance table .* of sharding .* have auto-increment key(.|\n)*") + msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.ErrorMatches, "(.|\n)*instance table .* of sharding .* have auto-increment key(.|\n)*") mock = conn.InitMockDB(c) mock.ExpectQuery("SHOW DATABASES").WillReturnRows(sqlmock.NewRows([]string{"DATABASE"}).AddRow(schema)) @@ -359,7 +403,9 @@ func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) { mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb1))) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb2))) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.IsNil) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) } func (s *testCheckerSuite) TestSameTargetTableDetection(c *tc.C) { @@ -400,5 +446,7 @@ func (s *testCheckerSuite) TestSameTargetTableDetection(c *tc.C) { mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1))) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb2))) - c.Assert(CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt), tc.ErrorMatches, "(.|\n)*same table name in case-insensitive(.|\n)*") + msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(len(msg), tc.Equals, 0) + c.Assert(err, tc.ErrorMatches, "(.|\n)*same table name in case-insensitive(.|\n)*") } diff --git a/dm/checker/checker.go b/dm/checker/checker.go index 8bf03107a77..92b088922bb 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -283,59 +283,59 @@ func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) { errs = append(errs, unit.NewProcessError(err)) } else if !result.Summary.Passed { errs = append(errs, unit.NewProcessError(errors.New("check was failed, please see detail"))) - warnLeft, errLeft := c.warnCnt, c.errCnt + } + warnLeft, errLeft := c.warnCnt, c.errCnt - // remove success result if not pass - results := result.Results[:0] - for _, r := range result.Results { - if r.State == checker.StateSuccess { - continue - } + // remove success result if not pass + results := result.Results[:0] + for _, r := range result.Results { + if r.State == checker.StateSuccess { + continue + } - // handle results without r.Errors - if len(r.Errors) == 0 { - switch r.State { - case checker.StateWarning: - if warnLeft == 0 { - continue - } - warnLeft-- - results = append(results, r) - case checker.StateFailure: - if errLeft == 0 { - continue - } - errLeft-- - results = append(results, r) + // handle results without r.Errors + if len(r.Errors) == 0 { + switch r.State { + case checker.StateWarning: + if warnLeft == 0 { + continue } - continue + warnLeft-- + results = append(results, r) + case checker.StateFailure: + if errLeft == 0 { + continue + } + errLeft-- + results = append(results, r) } + continue + } - subErrors := make([]*checker.Error, 0, len(r.Errors)) - for _, e := range r.Errors { - switch e.Severity { - case checker.StateWarning: - if warnLeft == 0 { - continue - } - warnLeft-- - subErrors = append(subErrors, e) - case checker.StateFailure: - if errLeft == 0 { - continue - } - errLeft-- - subErrors = append(subErrors, e) + subErrors := make([]*checker.Error, 0, len(r.Errors)) + for _, e := range r.Errors { + switch e.Severity { + case checker.StateWarning: + if warnLeft == 0 { + continue } + warnLeft-- + subErrors = append(subErrors, e) + case checker.StateFailure: + if errLeft == 0 { + continue + } + errLeft-- + subErrors = append(subErrors, e) } - // skip display an empty Result - if len(subErrors) > 0 { - r.Errors = subErrors - results = append(results, r) - } } - result.Results = results + // skip display an empty Result + if len(subErrors) > 0 { + r.Errors = subErrors + results = append(results, r) + } } + result.Results = results c.updateInstruction(result) @@ -345,9 +345,12 @@ func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) { default: } - rawResult, err := json.MarshalIndent(result, "\t", "\t") - if err != nil { - rawResult = []byte(fmt.Sprintf("marshal error %v", err)) + var rawResult []byte + if result.Summary.Successful != result.Summary.Total { + rawResult, err = json.MarshalIndent(result, "\t", "\t") + if err != nil { + rawResult = []byte(fmt.Sprintf("marshal error %v", err)) + } } c.result.Lock() diff --git a/dm/checker/cmd.go b/dm/checker/cmd.go index 66064747944..6ba0ab7c06d 100644 --- a/dm/checker/cmd.go +++ b/dm/checker/cmd.go @@ -15,6 +15,7 @@ package checker import ( "context" + "fmt" "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/dm/pb" @@ -22,11 +23,13 @@ import ( ) var ( - // ErrorMsgHeader used as the header of the error message when checking config failed. - ErrorMsgHeader = "fail to check synchronization configuration with type" + // CheckTaskMsgHeader used as the header of the error/warning message when checking config failed. + CheckTaskMsgHeader = "fail to check synchronization configuration with type" + + CheckTaskSuccess = "check pass!!!" // CheckSyncConfigFunc holds the CheckSyncConfig function. - CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) error + CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error) ) func init() { @@ -34,9 +37,9 @@ func init() { } // CheckSyncConfig checks synchronization configuration. -func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) error { +func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error) { if len(cfgs) == 0 { - return nil + return "", nil } // all `IgnoreCheckingItems` and `Mode` of sub-task are same, so we take first one @@ -53,25 +56,29 @@ func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, } checkingItems := config.FilterCheckingItems(ignoreCheckingItems) if len(checkingItems) == 0 { - return nil + return "", nil } c := NewChecker(cfgs, checkingItems, errCnt, warnCnt) if err := c.Init(ctx); err != nil { - return terror.Annotate(err, "fail to initial checker") + return "", terror.Annotate(err, "fail to initial checker") } defer c.Close() pr := make(chan pb.ProcessResult, 1) c.Process(ctx, pr) - for len(pr) > 0 { + if len(pr) > 0 { r := <-pr // we only want first error if len(r.Errors) > 0 { - return terror.ErrTaskCheckSyncConfigError.Generate(ErrorMsgHeader, r.Errors[0].Message, string(r.Detail)) + return "", terror.ErrTaskCheckSyncConfigError.Generate(CheckTaskMsgHeader, r.Errors[0].Message, string(r.Detail)) + } + if len(r.Detail) == 0 { + return CheckTaskSuccess, nil } + return fmt.Sprintf("%s: no errors but some warnings\n detail: %s", CheckTaskMsgHeader, string(r.Detail)), nil } - return nil + return "", nil } diff --git a/dm/dm/ctl/master/check_task.go b/dm/dm/ctl/master/check_task.go index 5cd0b5c2ae9..b3536f66c37 100644 --- a/dm/dm/ctl/master/check_task.go +++ b/dm/dm/ctl/master/check_task.go @@ -78,7 +78,7 @@ func checkTaskFunc(cmd *cobra.Command, _ []string) error { return err } - if !common.PrettyPrintResponseWithCheckTask(resp, checker.ErrorMsgHeader) { + if !common.PrettyPrintResponseWithCheckTask(resp, checker.CheckTaskMsgHeader) { common.PrettyPrintResponse(resp) } return nil diff --git a/dm/dm/ctl/master/start_task.go b/dm/dm/ctl/master/start_task.go index 21e286a41a5..c748c7419e5 100644 --- a/dm/dm/ctl/master/start_task.go +++ b/dm/dm/ctl/master/start_task.go @@ -97,7 +97,7 @@ func startTaskFunc(cmd *cobra.Command, _ []string) error { return err } - if !common.PrettyPrintResponseWithCheckTask(resp, checker.ErrorMsgHeader) { + if !common.PrettyPrintResponseWithCheckTask(resp, checker.CheckTaskMsgHeader) { common.PrettyPrintResponse(resp) } return nil diff --git a/dm/dm/master/openapi.go b/dm/dm/master/openapi.go index 2cb1c42eccf..9cb786dd881 100644 --- a/dm/dm/master/openapi.go +++ b/dm/dm/master/openapi.go @@ -445,11 +445,16 @@ func (s *Server) DMAPIStartTask(c *gin.Context) { for i := range subTaskConfigList { subTaskConfigPList[i] = &subTaskConfigList[i] } - if err = checker.CheckSyncConfigFunc(newCtx, subTaskConfigPList, - common.DefaultErrorCnt, common.DefaultWarnCnt); err != nil { + msg, err := checker.CheckSyncConfigFunc(newCtx, subTaskConfigPList, + common.DefaultErrorCnt, common.DefaultWarnCnt) + if err != nil { _ = c.Error(terror.WithClass(err, terror.ClassDMMaster)) return } + if len(msg) != 0 { + // TODO: return warning msg with http.StatusCreated and task together + log.L().Warn("openapi pre-check warning before start task", zap.String("warning", msg)) + } // specify only start task on partial sources var needStartSubTaskList []config.SubTaskConfig if req.SourceNameList != nil { diff --git a/dm/dm/master/openapi_test.go b/dm/dm/master/openapi_test.go index ebafa9658ad..c4c0191a02c 100644 --- a/dm/dm/master/openapi_test.go +++ b/dm/dm/master/openapi_test.go @@ -880,6 +880,6 @@ func mockTaskQueryStatus( ).Return(queryResp, nil).MaxTimes(maxRetryNum) } -func mockCheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) error { - return nil +func mockCheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error) { + return "", nil } diff --git a/dm/dm/master/server.go b/dm/dm/master/server.go index 03d3ad5c2b8..4a039f8b874 100644 --- a/dm/dm/master/server.go +++ b/dm/dm/master/server.go @@ -427,12 +427,19 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S } resp := &pb.StartTaskResponse{} - cfg, stCfgs, err := s.generateSubTask(ctx, req.Task, ctlcommon.DefaultErrorCnt, ctlcommon.DefaultWarnCnt) + cfg, stCfgs, err := s.generateSubTask(ctx, req.Task) if err != nil { resp.Msg = err.Error() // nolint:nilerr return resp, nil } + msg, err := checker.CheckSyncConfigFunc(ctx, stCfgs, ctlcommon.DefaultErrorCnt, ctlcommon.DefaultWarnCnt) + if err != nil { + resp.Msg = terror.WithClass(err, terror.ClassDMMaster).Error() + return resp, nil + } + resp.Msg = msg + log.L().Info("", zap.String("task name", cfg.Name), zap.String("task", cfg.JSON()), zap.String("request", "StartTask")) sourceRespCh := make(chan *pb.CommonWorkerResponse, len(stCfgs)) @@ -473,7 +480,7 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S // use same latch for remove-meta and start-task release, err3 = s.scheduler.AcquireSubtaskLatch(cfg.Name) if err3 != nil { - resp.Msg = terror.ErrSchedulerLatchInUse.Generate("RemoveMeta", cfg.Name).Error() + resp.Msg += terror.ErrSchedulerLatchInUse.Generate("RemoveMeta", cfg.Name).Error() // nolint:nilerr return resp, nil } @@ -481,19 +488,19 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S latched = true if scm := s.scheduler.GetSubTaskCfgsByTask(cfg.Name); len(scm) > 0 { - resp.Msg = terror.Annotate(terror.ErrSchedulerSubTaskExist.Generate(cfg.Name, sources), + resp.Msg += terror.Annotate(terror.ErrSchedulerSubTaskExist.Generate(cfg.Name, sources), "while remove-meta is true").Error() return resp, nil } err = s.removeMetaData(ctx, cfg.Name, cfg.MetaSchema, cfg.TargetDB) if err != nil { - resp.Msg = terror.Annotate(err, "while removing metadata").Error() + resp.Msg += terror.Annotate(err, "while removing metadata").Error() return resp, nil } } err = s.scheduler.AddSubTasks(latched, subtaskCfgPointersToInstances(stCfgs...)...) if err != nil { - resp.Msg = err.Error() + resp.Msg += err.Error() // nolint:nilerr return resp, nil } @@ -506,7 +513,7 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S resp.Result = true if cfg.RemoveMeta { - resp.Msg = "`remove-meta` in task config is deprecated, please use `start-task ... --remove-meta` instead" + resp.Msg += "`remove-meta` in task config is deprecated, please use `start-task ... --remove-meta` instead" } sourceResps = s.getSourceRespsAfterOperation(ctx, cfg.Name, sources, []string{}, req) } @@ -619,14 +626,20 @@ func (s *Server) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb return resp2, err2 } - cfg, stCfgs, err := s.generateSubTask(ctx, req.Task, ctlcommon.DefaultErrorCnt, ctlcommon.DefaultWarnCnt) + resp := &pb.UpdateTaskResponse{} + cfg, stCfgs, err := s.generateSubTask(ctx, req.Task) if err != nil { + resp.Msg = err.Error() // nolint:nilerr - return &pb.UpdateTaskResponse{ - Result: false, - Msg: err.Error(), - }, nil + return resp, nil + } + + msg, err := checker.CheckSyncConfigFunc(ctx, stCfgs, ctlcommon.DefaultErrorCnt, ctlcommon.DefaultWarnCnt) + if err != nil { + resp.Msg = terror.WithClass(err, terror.ClassDMMaster).Error() + return resp, nil } + resp.Msg = msg log.L().Info("update task", zap.String("task name", cfg.Name), zap.Stringer("task", cfg)) workerRespCh := make(chan *pb.CommonWorkerResponse, len(stCfgs)+len(req.Sources)) @@ -665,10 +678,9 @@ func (s *Server) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb workerResps = append(workerResps, workerRespMap[worker]) } - return &pb.UpdateTaskResponse{ - Result: true, - Sources: workerResps, - }, nil + resp.Result = true + resp.Sources = workerResps + return resp, nil } type hasWokers interface { @@ -1177,19 +1189,23 @@ func (s *Server) CheckTask(ctx context.Context, req *pb.CheckTaskRequest) (*pb.C return resp2, err2 } - _, _, err := s.generateSubTask(ctx, req.Task, req.ErrCnt, req.WarnCnt) + resp := &pb.CheckTaskResponse{} + _, stCfgs, err := s.generateSubTask(ctx, req.Task) if err != nil { + resp.Msg = err.Error() // nolint:nilerr - return &pb.CheckTaskResponse{ - Result: false, - Msg: err.Error(), - }, nil + return resp, nil } - return &pb.CheckTaskResponse{ - Result: true, - Msg: "check pass!!!", - }, nil + msg, err := checker.CheckSyncConfigFunc(ctx, stCfgs, req.ErrCnt, req.WarnCnt) + if err != nil { + resp.Msg = terror.WithClass(err, terror.ClassDMMaster).Error() + return resp, nil + } + resp.Msg = msg + resp.Result = true + + return resp, nil } func parseAndAdjustSourceConfig(ctx context.Context, contents []string) ([]*config.SourceConfig, error) { @@ -1436,7 +1452,7 @@ func (s *Server) OperateLeader(ctx context.Context, req *pb.OperateLeaderRequest }, nil } -func (s *Server) generateSubTask(ctx context.Context, task string, errCnt, warnCnt int64) (*config.TaskConfig, []*config.SubTaskConfig, error) { +func (s *Server) generateSubTask(ctx context.Context, task string) (*config.TaskConfig, []*config.SubTaskConfig, error) { cfg := config.NewTaskConfig() err := cfg.Decode(task) if err != nil { @@ -1455,11 +1471,6 @@ func (s *Server) generateSubTask(ctx context.Context, task string, errCnt, warnC return nil, nil, terror.WithClass(err, terror.ClassDMMaster) } - err = checker.CheckSyncConfigFunc(ctx, stCfgs, errCnt, warnCnt) - if err != nil { - return nil, nil, terror.WithClass(err, terror.ClassDMMaster) - } - return cfg, stCfgs, nil } diff --git a/dm/dm/master/server_test.go b/dm/dm/master/server_test.go index bccc84d6ea2..75cd4b345ce 100644 --- a/dm/dm/master/server_test.go +++ b/dm/dm/master/server_test.go @@ -911,8 +911,8 @@ func (t *testMaster) TestStartTask(c *check.C) { // test start task, but the first step check-task fails bakCheckSyncConfigFunc := checker.CheckSyncConfigFunc - checker.CheckSyncConfigFunc = func(_ context.Context, _ []*config.SubTaskConfig, _, _ int64) error { - return errors.New(errCheckSyncConfig) + checker.CheckSyncConfigFunc = func(_ context.Context, _ []*config.SubTaskConfig, _, _ int64) (string, error) { + return "", errors.New(errCheckSyncConfig) } defer func() { checker.CheckSyncConfigFunc = bakCheckSyncConfigFunc diff --git a/dm/pkg/checker/table_structure.go b/dm/pkg/checker/table_structure.go index 3d7cdec915b..48b7044078a 100644 --- a/dm/pkg/checker/table_structure.go +++ b/dm/pkg/checker/table_structure.go @@ -125,7 +125,7 @@ func (c *TablesChecker) Check(ctx context.Context) *Result { for _, option := range opts { switch option.state { case StateWarning: - if len(r.State) == 0 { + if r.State != StateFailure { r.State = StateWarning } e := NewError(tableMsg + option.errMessage) diff --git a/dm/tests/dmctl_basic/check_list/check_task.sh b/dm/tests/dmctl_basic/check_list/check_task.sh index 6f2200459b2..333c4be8a91 100644 --- a/dm/tests/dmctl_basic/check_list/check_task.sh +++ b/dm/tests/dmctl_basic/check_list/check_task.sh @@ -36,31 +36,39 @@ function check_task_error_database_config() { } function check_task_error_count() { + task_conf=$1 # 10 errors run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "check-task $cur/conf/dm-task3.yaml" \ + "check-task $task_conf" \ "\"result\": false" 1 \ "\"failed\": 2" 1 \ "\"state\": \"fail\"" 2 # 1 error run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "check-task $cur/conf/dm-task3.yaml -e 1" \ + "check-task $task_conf -e 1" \ "\"result\": false" 1 \ "\"failed\": 2" 1 \ "\"state\": \"fail\"" 1 # 100 errors run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "check-task $cur/conf/dm-task3.yaml -e 100 -w 1" \ + "check-task $task_conf -e 100 -w 1" \ "\"result\": false" 1 \ "\"failed\": 2" 1 \ "\"state\": \"fail\"" 2 # 0 error run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "check-task $cur/conf/dm-task3.yaml -e 0" \ + "check-task $task_conf -e 0" \ "\"result\": false" 1 \ "\"failed\": 2" 1 \ "\"state\": \"fail\"" 0 } + +function check_task_only_warning() { + task_conf=$1 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "check-task $task_conf" \ + "\"state\": \"warn\"" 1 +} diff --git a/dm/tests/dmctl_basic/conf/only_warning.yaml b/dm/tests/dmctl_basic/conf/only_warning.yaml new file mode 100644 index 00000000000..b8bee809cb7 --- /dev/null +++ b/dm/tests/dmctl_basic/conf/only_warning.yaml @@ -0,0 +1,44 @@ +--- +name: pre_check_only_warning +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +enable-heartbeat: false + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +block-allow-list: + instance: + do-dbs: ["dmctl"] + do-tables: + - db-name: "dmctl" + tbl-name: "~^only_warning$" + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 + checkpoint-flush-interval: 1 diff --git a/dm/tests/dmctl_basic/data/db1.prepare.sql b/dm/tests/dmctl_basic/data/db1.prepare.sql index f4e971c9eb8..289b61d977d 100644 --- a/dm/tests/dmctl_basic/data/db1.prepare.sql +++ b/dm/tests/dmctl_basic/data/db1.prepare.sql @@ -25,3 +25,5 @@ INSERT INTO `dmctl`.`t_1` (`b`,`c`,`d`,`id`) VALUES (800180420,'JuUIxUacksp','sX create table tb_1(a INT, b INT); create table tb_2(a INT, c INT); + +CREATE TABLE only_warning (id bigint, b int, primary key id(id), FOREIGN KEY (b) REFERENCES t_1(b)); \ No newline at end of file diff --git a/dm/tests/dmctl_basic/run.sh b/dm/tests/dmctl_basic/run.sh index b43e4fd6193..a1d82ee7dea 100755 --- a/dm/tests/dmctl_basic/run.sh +++ b/dm/tests/dmctl_basic/run.sh @@ -282,6 +282,15 @@ function run() { check_task_not_pass $cur/conf/dm-task2.yaml check_task_error_count $cur/conf/dm-task3.yaml + echo "check_task_only_warning" + check_task_only_warning $cur/conf/only_warning.yaml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $cur/conf/only_warning.yaml" \ + "\"state\": \"warn\"" 1 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task $cur/conf/only_warning.yaml" \ + "\"result\": true" 2 + cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task-error-database-config.yaml sed -i "s/password: \"\"/password: \"wrond password\"/g" $WORK_DIR/dm-task-error-database-config.yaml check_task_error_database_config $WORK_DIR/dm-task-error-database-config.yaml