Skip to content

Commit

Permalink
ha(dm): support store task config in Etcd (pingcap#3620)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 authored and zhaoxinyu committed Dec 29, 2021
1 parent 802ce0d commit 53b3f02
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 1 deletion.
2 changes: 2 additions & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ ErrConfigGenColumnMapping,[code=20046:class=config:scope=internal:level=high], "
ErrConfigInvalidChunkFileSize,[code=20047:class=config:scope=internal:level=high], "Message: invalid `chunk-filesize` %v, Workaround: Please check the `chunk-filesize` config in task configuration file."
ErrConfigOnlineDDLInvalidRegex,[code=20048:class=config:scope=internal:level=high], "Message: config '%s' regex pattern '%s' invalid, reason: %s, Workaround: Please check if params is correctly in the configuration file."
ErrConfigOnlineDDLMistakeRegex,[code=20049:class=config:scope=internal:level=high], "Message: online ddl sql '%s' invalid, table %s fail to match '%s' online ddl regex, Workaround: Please update your `shadow-table-rules` or `trash-table-rules` in the configuration file."
ErrOpenAPITaskConfigExist,[code=20050:class=config:scope=internal:level=low], "Message: the openapi task config for '%s' already exist, Workaround: If you want to override it, please use the overwrite flag."
ErrOpenAPITaskConfigNotExist,[code=20051:class=config:scope=internal:level=low], "Message: the openapi task config for '%s' does not exist"
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
8 changes: 7 additions & 1 deletion dm/dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,19 @@ var (
// tb2: +a +b +c
// tb3: +a +b +c
ShardDDLOptimismDroppedColumnsKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/dropped-columns/")

// OpenAPITaskConfigKeyAdapter is used to store the openapi task config (openapi.Task), now it's only used for WebUI.
// openapi.Task is a struct that can be converted to config.StubTaskConfig so if any field of openapi.Task updated
// user should use ha.PutOpenAPITaskConfig(key, openapi.Task,overwrite) to force update the content in etcd.
// k/v: Encode(task-name) -> openapi.Task.
OpenAPITaskConfigKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/openapi-task-config/")
)

func keyAdapterKeysLen(s KeyAdapter) int {
switch s {
case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter,
WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter,
UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter:
UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter, OpenAPITaskConfigKeyAdapter:
return 1
case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter,
ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter,
Expand Down
5 changes: 5 additions & 0 deletions dm/dm/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func (t *testCommon) TestKeyAdapter(c *C) {
adapter: UpstreamSubTaskKeyAdapter,
want: "/dm-master/upstream/subtask/6d7973716c31/e4b8ade6968731f09f8084efb88f",
},
{
keys: []string{"task-1"},
adapter: OpenAPITaskConfigKeyAdapter,
want: "/dm-master/openapi-task-config/7461736b2d31",
},
}

for _, ca := range testCases {
Expand Down
12 changes: 12 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,18 @@ description = ""
workaround = "Please update your `shadow-table-rules` or `trash-table-rules` in the configuration file."
tags = ["internal", "high"]

[error.DM-config-20050]
message = "the openapi task config for '%s' already exist"
description = ""
workaround = "If you want to override it, please use the overwrite flag."
tags = ["internal", "low"]

[error.DM-config-20051]
message = "the openapi task config for '%s' does not exist"
description = ""
workaround = ""
tags = ["internal", "low"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
12 changes: 12 additions & 0 deletions dm/openapi/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package openapi

import (
"encoding/json"

"github.com/pingcap/ticdc/dm/pkg/terror"
)

Expand All @@ -30,3 +32,13 @@ func (t *Task) Adjust() error {
}
return nil
}

// FromJSON unmarshal json to task.
func (t *Task) FromJSON(data []byte) error {
return json.Unmarshal(data, t)
}

// ToJSON marshal json to task.
func (t *Task) ToJSON() ([]byte, error) {
return json.Marshal(t)
}
135 changes: 135 additions & 0 deletions dm/pkg/ha/openapi_task_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ha

import (
"context"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/clientv3util"

"github.com/pingcap/ticdc/dm/dm/common"
"github.com/pingcap/ticdc/dm/openapi"
"github.com/pingcap/ticdc/dm/pkg/etcdutil"
"github.com/pingcap/ticdc/dm/pkg/terror"
)

func openAPITaskFromResp(resp *clientv3.GetResponse) (*openapi.Task, error) {
task := &openapi.Task{}
if resp.Count == 0 {
return nil, nil
} else if resp.Count > 1 {
// this should not happen.
return task, terror.ErrConfigMoreThanOne.Generate(resp.Count, "openapi.Task", "")
}
// we make sure only have one task config.
if err := task.FromJSON(resp.Kvs[0].Value); err != nil {
return task, err
}
return task, nil
}

// PutOpenAPITaskConfig puts the openapi task config of task-name.
func PutOpenAPITaskConfig(cli *clientv3.Client, task openapi.Task, overWrite bool) error {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

key := common.OpenAPITaskConfigKeyAdapter.Encode(task.Name)
taskJSON, err := task.ToJSON()
if err != nil {
return err // it should not happen.
}
txn := cli.Txn(ctx)
if !overWrite {
txn = txn.If(clientv3util.KeyMissing(key))
}
resp, err := txn.Then(clientv3.OpPut(key, string(taskJSON))).Commit()
if err != nil {
return err
}
// user don't want to overwrite and key already exists.
if !overWrite && !resp.Succeeded {
return terror.ErrOpenAPITaskConfigExist.Generate(task.Name)
}
return nil
}

// UpdateOpenAPITaskConfig updates the openapi task config by task-name.
func UpdateOpenAPITaskConfig(cli *clientv3.Client, task openapi.Task) error {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

key := common.OpenAPITaskConfigKeyAdapter.Encode(task.Name)
taskJSON, err := task.ToJSON()
if err != nil {
return err // it should not happen.
}
txn := cli.Txn(ctx).If(clientv3util.KeyExists(key)).Then(clientv3.OpPut(key, string(taskJSON)))
resp, err := txn.Commit()
if err != nil {
return err
}
// user want to update a key not exists.
if !resp.Succeeded {
return terror.ErrOpenAPITaskConfigNotExist.Generate(task.Name)
}
return nil
}

// DeleteOpenAPITaskConfig deletes the openapi task config of task-name.
func DeleteOpenAPITaskConfig(cli *clientv3.Client, taskName string) error {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()
if _, err := cli.Delete(ctx, common.OpenAPITaskConfigKeyAdapter.Encode(taskName)); err != nil {
return err
}
return nil
}

// GetOpenAPITaskConfig gets the openapi task config of task-name.
func GetOpenAPITaskConfig(cli *clientv3.Client, taskName string) (*openapi.Task, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

var (
task *openapi.Task
resp *clientv3.GetResponse
err error
)
resp, err = cli.Get(ctx, common.OpenAPITaskConfigKeyAdapter.Encode(taskName))
if err != nil {
return task, err
}
return openAPITaskFromResp(resp)
}

// GetAllOpenAPITaskConfig gets all openapi task config s.
func GetAllOpenAPITaskConfig(cli *clientv3.Client) ([]*openapi.Task, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

resp, err := cli.Get(ctx, common.OpenAPITaskConfigKeyAdapter.Path(), clientv3.WithPrefix())
if err != nil {
return nil, err
}
tasks := make([]*openapi.Task, resp.Count)
for i, kv := range resp.Kvs {
t := &openapi.Task{}
if err := t.FromJSON(kv.Value); err != nil {
return nil, err
}
tasks[i] = t
}
return tasks, nil
}
91 changes: 91 additions & 0 deletions dm/pkg/ha/openapi_task_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ha

import (
. "github.com/pingcap/check"

"github.com/pingcap/ticdc/dm/openapi"
"github.com/pingcap/ticdc/dm/openapi/fixtures"
"github.com/pingcap/ticdc/dm/pkg/terror"
)

func (t *testForEtcd) TestOpenAPITaskConfigEtcd(c *C) {
defer clearTestInfoOperation(c)

task1, err := fixtures.GenNoShardOpenAPITaskForTest()
task1.Name = "test-1"
c.Assert(err, IsNil)
task2, err := fixtures.GenShardAndFilterOpenAPITaskForTest()
task2.Name = "test-2"
c.Assert(err, IsNil)

// no openapi task config exist.
task1InEtcd, err := GetOpenAPITaskConfig(etcdTestCli, task1.Name)
c.Assert(err, IsNil)
c.Assert(task1InEtcd, IsNil)

task2InEtcd, err := GetOpenAPITaskConfig(etcdTestCli, task2.Name)
c.Assert(err, IsNil)
c.Assert(task2InEtcd, IsNil)

tasks, err := GetAllOpenAPITaskConfig(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 0)

// put openapi task config .
c.Assert(PutOpenAPITaskConfig(etcdTestCli, task1, false), IsNil)
c.Assert(PutOpenAPITaskConfig(etcdTestCli, task2, false), IsNil)

task1InEtcd, err = GetOpenAPITaskConfig(etcdTestCli, task1.Name)
c.Assert(err, IsNil)
c.Assert(*task1InEtcd, DeepEquals, task1)

task2InEtcd, err = GetOpenAPITaskConfig(etcdTestCli, task2.Name)
c.Assert(err, IsNil)
c.Assert(*task2InEtcd, DeepEquals, task2)

tasks, err = GetAllOpenAPITaskConfig(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)

// put openapi task config again without overwrite will fail
c.Assert(terror.ErrOpenAPITaskConfigExist.Equal(PutOpenAPITaskConfig(etcdTestCli, task1, false)), IsTrue)

// in overwrite mode, it will overwrite the old one.
task1.TaskMode = openapi.TaskTaskModeFull
c.Assert(PutOpenAPITaskConfig(etcdTestCli, task1, true), IsNil)
task1InEtcd, err = GetOpenAPITaskConfig(etcdTestCli, task1.Name)
c.Assert(err, IsNil)
c.Assert(*task1InEtcd, DeepEquals, task1)

// put task config that not exist will fail
task3, err := fixtures.GenNoShardOpenAPITaskForTest()
c.Assert(err, IsNil)
task3.Name = "test-3"
c.Assert(terror.ErrOpenAPITaskConfigNotExist.Equal(UpdateOpenAPITaskConfig(etcdTestCli, task3)), IsTrue)

// update exist openapi task config will success
task1.TaskMode = openapi.TaskTaskModeAll
c.Assert(UpdateOpenAPITaskConfig(etcdTestCli, task1), IsNil)
task1InEtcd, err = GetOpenAPITaskConfig(etcdTestCli, task1.Name)
c.Assert(err, IsNil)
c.Assert(*task1InEtcd, DeepEquals, task1)

// delete task config
c.Assert(DeleteOpenAPITaskConfig(etcdTestCli, task1.Name), IsNil)
tasks, err = GetAllOpenAPITaskConfig(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
}
4 changes: 4 additions & 0 deletions dm/pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ const (
codeConfigInvalidChunkFileSize
codeConfigOnlineDDLInvalidRegex
codeConfigOnlineDDLMistakeRegex
codeConfigOpenAPITaskConfigExist
codeConfigOpenAPITaskConfigNotExist
)

// Binlog operation error code list.
Expand Down Expand Up @@ -897,6 +899,8 @@ var (
"config '%s' regex pattern '%s' invalid, reason: %s", "Please check if params is correctly in the configuration file.")
ErrConfigOnlineDDLMistakeRegex = New(codeConfigOnlineDDLMistakeRegex, ClassConfig, ScopeInternal, LevelHigh,
"online ddl sql '%s' invalid, table %s fail to match '%s' online ddl regex", "Please update your `shadow-table-rules` or `trash-table-rules` in the configuration file.")
ErrOpenAPITaskConfigExist = New(codeConfigOpenAPITaskConfigExist, ClassConfig, ScopeInternal, LevelLow, "the openapi task config for '%s' already exist", "If you want to override it, please use the overwrite flag.")
ErrOpenAPITaskConfigNotExist = New(codeConfigOpenAPITaskConfigNotExist, ClassConfig, ScopeInternal, LevelLow, "the openapi task config for '%s' does not exist", "")

// Binlog operation error.
ErrBinlogExtractPosition = New(codeBinlogExtractPosition, ClassBinlogOp, ScopeInternal, LevelHigh, "", "")
Expand Down

0 comments on commit 53b3f02

Please sign in to comment.