Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openapi(dm): add more task config field #4265

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,18 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
// set shard config
if task.ShardMode != nil {
subTaskCfg.IsSharding = true
mode := *task.ShardMode
subTaskCfg.ShardMode = string(mode)
subTaskCfg.ShardMode = string(*task.ShardMode)
} else {
subTaskCfg.IsSharding = false
}
// set timezone
if task.TimeZone != nil {
subTaskCfg.Timezone = *task.TimeZone
}
// set collation_compatible
if task.CollationCompatible != nil {
subTaskCfg.CollationCompatible = string(*task.CollationCompatible)
}
// set online ddl plugin config
subTaskCfg.OnlineDDL = task.EnhanceOnlineSchemaChange
// set case sensitive from source
Expand Down Expand Up @@ -206,6 +213,12 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
if incrCfg.ReplBatch != nil {
subTaskCfg.SyncerConfig.Batch = *incrCfg.ReplBatch
}
if incrCfg.Compact != nil {
subTaskCfg.SyncerConfig.Compact = *incrCfg.Compact
}
if incrCfg.MultipleRows != nil {
subTaskCfg.SyncerConfig.MultipleRows = *incrCfg.MultipleRows
}
}
// set route,blockAllowList,filter config
doCnt := len(tableMigrateRuleMap[sourceCfg.SourceName])
Expand Down Expand Up @@ -422,8 +435,10 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigMap map[string]map[string]SubTaskC
ImportThreads: &oneSubtaskConfig.LoaderConfig.PoolSize,
}
taskSourceConfig.IncrMigrateConf = &openapi.TaskIncrMigrateConf{
ReplBatch: &oneSubtaskConfig.SyncerConfig.Batch,
ReplThreads: &oneSubtaskConfig.SyncerConfig.WorkerCount,
ReplBatch: &oneSubtaskConfig.SyncerConfig.Batch,
Compact: &oneSubtaskConfig.SyncerConfig.Compact,
ReplThreads: &oneSubtaskConfig.SyncerConfig.WorkerCount,
MultipleRows: &oneSubtaskConfig.SyncerConfig.MultipleRows,
}
// set filter rules
filterRuleMap := openapi.Task_BinlogFilterRule{}
Expand Down Expand Up @@ -494,6 +509,9 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigMap map[string]map[string]SubTaskC
if len(filterMap) > 0 {
task.BinlogFilterRule = &filterRuleMap
}
task.TimeZone = &oneSubtaskConfig.Timezone
collationCompatible := openapi.TaskCollationCompatible(oneSubtaskConfig.CollationCompatible)
task.CollationCompatible = &collationCompatible
task.TableMigrateRule = tableMigrateRuleList
taskList = append(taskList, task)
}
Expand Down
24 changes: 23 additions & 1 deletion dm/dm/config/task_converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func testNoShardTaskToSubTaskConfigs(c *check.C) {
c.Assert(subTaskConfig.ShardMode, check.Equals, "")
// check online schema change
c.Assert(subTaskConfig.OnlineDDL, check.Equals, true)
// check timezone
c.Assert(subTaskConfig.Timezone, check.Equals, *task.TimeZone)
// check collation
c.Assert(subTaskConfig.CollationCompatible, check.Equals, string(*task.CollationCompatible))
// check case sensitive
c.Assert(subTaskConfig.CaseSensitive, check.Equals, sourceCfg1.CaseSensitive)
// check from
Expand All @@ -86,8 +90,11 @@ func testNoShardTaskToSubTaskConfigs(c *check.C) {
c.Assert(subTaskConfig.LoaderConfig.Dir, check.Equals, fmt.Sprintf(
"%s.%s", *task.SourceConfig.FullMigrateConf.DataDir, task.Name))
c.Assert(subTaskConfig.LoaderConfig.PoolSize, check.Equals, *task.SourceConfig.FullMigrateConf.ImportThreads)
c.Assert(subTaskConfig.SyncerConfig.WorkerCount, check.Equals, *task.SourceConfig.IncrMigrateConf.ReplThreads)
c.Assert(subTaskConfig.SyncerConfig.Compact, check.Equals, *task.SourceConfig.IncrMigrateConf.Compact)
c.Assert(subTaskConfig.SyncerConfig.Batch, check.Equals, *task.SourceConfig.IncrMigrateConf.ReplBatch)
c.Assert(subTaskConfig.SyncerConfig.WorkerCount, check.Equals, *task.SourceConfig.IncrMigrateConf.ReplThreads)
c.Assert(subTaskConfig.SyncerConfig.MultipleRows, check.Equals, *task.SourceConfig.IncrMigrateConf.MultipleRows)

// check route
c.Assert(subTaskConfig.RouteRules, check.HasLen, 1)
rule := subTaskConfig.RouteRules[0]
Expand Down Expand Up @@ -280,6 +287,21 @@ func testNoShardSubTaskConfigsToOpenAPITask(c *check.C) {
c.Assert(taskList, check.HasLen, 1)
newTask := taskList[0]

t1, _ := newTask.ToJSON()
t2, _ := task.ToJSON()
c.Assert(string(t1), check.Equals, string(t2))

// println("~~~~~~t1~~~~~~~", task.CollationCompatible, task.TimeZone)
// println("~~~~~~t2~~~~~~~", newTask.CollationCompatible, newTask.TimeZone)

// c.Assert(*task.CollationCompatible, check.Equals, *newTask.CollationCompatible)
// c.Assert(*task.TimeZone, check.Equals, *newTask.TimeZone)
// c.Assert(task.SourceConfig.IncrMigrateConf, check.DeepEquals, newTask.SourceConfig.IncrMigrateConf)

// newTask.TimeZone = task.TimeZone
// newTask.CollationCompatible = task.CollationCompatible
// newTask.SourceConfig.IncrMigrateConf = task.SourceConfig.IncrMigrateConf

c.Assert(task, check.DeepEquals, newTask)
}

Expand Down
9 changes: 8 additions & 1 deletion dm/openapi/fixtures/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@ var (
"enhance_online_schema_change": true,
"meta_schema": "dm_meta",
"name": "test",
"time_zone": "Asia/Shanghai",
"collation_compatible": "loose",
"on_duplicate": "error",
"source_config": {
"full_migrate_conf": {
"data_dir": "./exported_data",
"export_threads": 4,
"import_threads": 16
},
"incr_migrate_conf": { "repl_batch": 200, "repl_threads": 32 },
"incr_migrate_conf": {
"repl_threads": 16,
"repl_batch": 100,
"multiple_rows": false,
"compact": false
},
"source_conf": [{ "source_name": "mysql-replica-01" }]
},
"table_migrate_rule": [
Expand Down
171 changes: 88 additions & 83 deletions dm/openapi/gen.server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading